XRootD
Loading...
Searching...
No Matches
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (const URL *url, const URL &prefer=URL())
 Constructor.
 
 ~Stream ()
 Destructor.
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty.
 
void Disconnect (bool force=false)
 Disconnect the stream.
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection.
 
void ForceError (XRootDStatus status)
 Force error.
 
const std::string & GetName () const
 Return stream name.
 
const URLGetURL () const
 Get the URL.
 
XRootDStatus Initialize ()
 Initializer.
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed.
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error.
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error.
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed.
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout.
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout.
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream.
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler.
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler.
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending.
 
void SetChannelData (AnyObject *channelData)
 Set the channel data.
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue.
 
void SetJobManager (JobManager *jobManager)
 Set job manager.
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams.
 
void SetPoller (Poller *poller)
 Set the poller.
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager.
 
void SetTransport (TransportHandler *transport)
 Set the transport.
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58 {
59 Disconnected = 0,
60 Connected = 1,
61 Connecting = 2,
62 Error = 3
63 };
@ Disconnected
Not connected.
@ Error
Broken.
@ Connected
Connected.
@ Connecting
In the process of being connected.

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( const URL * url,
const URL & prefer = URL() )

Constructor.

Definition at line 96 of file XrdClStream.cc.

96 :
97 pUrl( url ),
98 pPrefer( prefer ),
99 pTransport( 0 ),
100 pPoller( 0 ),
101 pTaskManager( 0 ),
102 pJobManager( 0 ),
103 pIncomingQueue( 0 ),
104 pChannelData( 0 ),
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType( Utils::IPAll ),
109 pSessionId( 0 ),
110 pBytesSent( 0 ),
111 pBytesReceived( 0 )
112 {
113 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
114 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
115
116 std::ostringstream o;
117 o << pUrl->GetHostId();
118 pStreamName = o.str();
119
120 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
122 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
124 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
126
127 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
129
130 pAddressType = Utils::String2AddressType( netStack );
131 if( pAddressType == Utils::AddressType::IPAuto )
132 {
134 if( !( stacks & XrdNetUtils::hasIP64 ) )
135 {
136 if( stacks & XrdNetUtils::hasIPv4 )
137 pAddressType = Utils::AddressType::IPv4;
138 else if( stacks & XrdNetUtils::hasIPv6 )
139 pAddressType = Utils::AddressType::IPv6;
140 }
141 }
142
143 Log *log = DefaultEnv::GetLog();
144 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
145 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
146 "Window: %d", pStreamName.c_str(), netStack.c_str(),
147 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
148 }
static Log * GetLog()
Get default log.
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:94
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:80
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition XrdConfig.cc:111

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::URL::GetHostId(), XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdCl::Utils::IPAuto, XrdCl::Utils::IPv4, XrdCl::Utils::IPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, XrdNetUtils::qryINIF, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 153 of file XrdClStream.cc.

154 {
155 Disconnect( true );
156
157 Log *log = DefaultEnv::GetLog();
158 log->Debug( PostMasterMsg, "[%s] Destroying stream",
159 pStreamName.c_str() );
160
161 MonitorDisconnection( XRootDStatus() );
162
163 SubStreamList::iterator it;
164 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
165 delete *it;
166 }
void Disconnect(bool force=false)
Disconnect the stream.

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL & url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1171 of file XrdClStream.cc.

1172 {
1173 Log *log = DefaultEnv::GetLog();
1174
1175 //--------------------------------------------------------------------------
1176 // Resolve all the addresses of the host we're supposed to connect to
1177 //--------------------------------------------------------------------------
1178 std::vector<XrdNetAddr> prefaddrs;
1179 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1180 if( !st.IsOK() )
1181 {
1182 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1183 , pStreamName.c_str(), url.GetHostName().c_str() );
1184 return false;
1185 }
1186
1187 //--------------------------------------------------------------------------
1188 // Resolve all the addresses of the alias
1189 //--------------------------------------------------------------------------
1190 std::vector<XrdNetAddr> aliasaddrs;
1191 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1192 if( !st.IsOK() )
1193 {
1194 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1195 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1196 return false;
1197 }
1198
1199 //--------------------------------------------------------------------------
1200 // Now check if the preferred host is part of the alias
1201 //--------------------------------------------------------------------------
1202 auto itr = prefaddrs.begin();
1203 for( ; itr != prefaddrs.end() ; ++itr )
1204 {
1205 auto itr2 = aliasaddrs.begin();
1206 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1207 if( itr->Same( &*itr2 ) ) return true;
1208 }
1209
1210 return false;
1211 }
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:165
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t subStream)

Disables respective uplink if empty.

Definition at line 568 of file XrdClStream.cc.

569 {
570 XrdSysMutexHelper scopedLock( pMutex );
571 Log *log = DefaultEnv::GetLog();
572
573 if( pSubStreams[subStream]->outQueue->IsEmpty() )
574 {
575 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
576 pSubStreams[subStream]->socket->GetStreamName().c_str() );
577 pSubStreams[subStream]->socket->DisableUplink();
578 }
579 }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool force = false)

Disconnect the stream.

Definition at line 363 of file XrdClStream.cc.

364 {
365 XrdSysMutexHelper scopedLock( pMutex );
366 SubStreamList::iterator it;
367 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
368 {
369 (*it)->socket->Close();
370 (*it)->status = Socket::Disconnected;
371 }
372 }
@ Disconnected
The socket is disconnected.

References XrdCl::Socket::Disconnected.

Referenced by ~Stream().

+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID & path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 187 of file XrdClStream.cc.

188 {
189 XrdSysMutexHelper scopedLock( pMutex );
190
191 //--------------------------------------------------------------------------
192 // We are in the process of connecting the main stream, so we do nothing
193 // because when the main stream connection is established it will connect
194 // all the other streams
195 //--------------------------------------------------------------------------
196 if( pSubStreams[0]->status == Socket::Connecting )
197 return XRootDStatus();
198
199 //--------------------------------------------------------------------------
200 // The main stream is connected, so we can verify whether we have
201 // the up and the down stream connected and ready to handle data.
202 // If anything is not right we fall back to stream 0.
203 //--------------------------------------------------------------------------
204 if( pSubStreams[0]->status == Socket::Connected )
205 {
206 if( pSubStreams[path.down]->status != Socket::Connected )
207 path.down = 0;
208
209 if( pSubStreams[path.up]->status == Socket::Disconnected )
210 {
211 path.up = 0;
212 return pSubStreams[0]->socket->EnableUplink();
213 }
214
215 if( pSubStreams[path.up]->status == Socket::Connected )
216 return pSubStreams[path.up]->socket->EnableUplink();
217
218 return XRootDStatus();
219 }
220
221 //--------------------------------------------------------------------------
222 // The main stream is not connected, we need to check whether enough time
223 // has passed since we last encountered an error (if any) so that we could
224 // re-attempt the connection
225 //--------------------------------------------------------------------------
226 Log *log = DefaultEnv::GetLog();
227 time_t now = ::time(0);
228
229 if( now-pLastStreamError < pStreamErrorWindow )
230 return pLastFatalError;
231
232 gettimeofday( &pConnectionStarted, 0 );
233 ++pConnectionCount;
234
235 //--------------------------------------------------------------------------
236 // Resolve all the addresses of the host we're supposed to connect to
237 //--------------------------------------------------------------------------
238 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
239 if( !st.IsOK() )
240 {
241 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
242 "the host", pStreamName.c_str() );
243 pLastStreamError = now;
244 st.status = stFatal;
245 pLastFatalError = st;
246 return st;
247 }
248
249 if( pPrefer.IsValid() )
250 {
251 std::vector<XrdNetAddr> addrresses;
252 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
253 if( !st.IsOK() )
254 {
255 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
256 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
257 }
258 else
259 {
260 std::vector<XrdNetAddr> tmp;
261 tmp.reserve( pAddresses.size() );
262 // first add all remaining addresses
263 auto itr = pAddresses.begin();
264 for( ; itr != pAddresses.end() ; ++itr )
265 {
266 if( !HasNetAddr( *itr, addrresses ) )
267 tmp.push_back( *itr );
268 }
269 // then copy all 'preferred' addresses
270 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
271 // and keep the result
272 pAddresses.swap( tmp );
273 }
274 }
275
277 pAddresses );
278
279 while( !pAddresses.empty() )
280 {
281 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
282 pAddresses.pop_back();
283 pConnectionInitTime = ::time( 0 );
284 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
285 if( st.IsOK() )
286 {
287 pSubStreams[0]->status = Socket::Connecting;
288 break;
289 }
290 }
291 return st;
292 }
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:438
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
const uint16_t stFatal
Fatal error, it's still an error.

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostId(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 347 of file XrdClStream.cc.

348 {
349 XrdSysMutexHelper scopedLock( pMutex );
350 if( pSubStreams[0]->status == Socket::Connecting )
351 {
352 pSubStreams[0]->status = Socket::Disconnected;
353 XrdCl::PathID path( 0, 0 );
354 XrdCl::XRootDStatus st = EnableLink( path );
355 if( !st.IsOK() )
356 OnConnectError( 0, st );
357 }
358 }
XRootDStatus EnableLink(PathID &path)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool IsOK() const
We're fine.

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus status)

Force error.

Definition at line 913 of file XrdClStream.cc.

914 {
915 XrdSysMutexHelper scopedLock( pMutex );
916 Log *log = DefaultEnv::GetLog();
917 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
918 {
919 if( pSubStreams[substream]->status != Socket::Connected ) continue;
920 pSubStreams[substream]->socket->Close();
921 pSubStreams[substream]->status = Socket::Disconnected;
922 log->Error( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
923 pStreamName.c_str(), status.ToString().c_str() );
924
925 //--------------------------------------------------------------------
926 // Reinsert the stuff that we have failed to sent
927 //--------------------------------------------------------------------
928 if( pSubStreams[substream]->outMsgHelper.msg )
929 {
930 OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
931 pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
932 h.stateful );
933 pSubStreams[substream]->outMsgHelper.Reset();
934 }
935
936 //--------------------------------------------------------------------
937 // Reinsert the receiving handler and reset any partially read partial
938 //--------------------------------------------------------------------
939 if( pSubStreams[substream]->inMsgHelper.handler )
940 {
941 InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
942 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
943 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
944 if( xrdHandler ) xrdHandler->PartialReceived();
945 h.Reset();
946 }
947 }
948
949 pConnectionCount = 0;
950
951 //------------------------------------------------------------------------
952 // We're done here, unlock the stream mutex to avoid deadlocks and
953 // report the disconnection event to the handlers
954 //------------------------------------------------------------------------
955 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
956 "message handlers.", pStreamName.c_str() );
957
958 SubStreamList::iterator it;
959 OutQueue q;
960 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
961 q.GrabItems( *(*it)->outQueue );
962 scopedLock.UnLock();
963
964 q.Report( status );
965
966 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
967 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
968 }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::Log::Error(), XrdCl::InMessageHelper::expires, XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string & XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171 {
172 return pStreamName;
173 }

◆ GetURL()

const URL * XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158 {
159 return pUrl;
160 }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 171 of file XrdClStream.cc.

172 {
173 if( !pTransport || !pPoller || !pChannelData )
174 return XRootDStatus( stError, errUninitialized );
175
176 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
177 pChannelData, 0, this );
178 pSubStreams.push_back( new SubStreamData() );
179 pSubStreams[0]->socket = s;
180 return XRootDStatus();
181 }
const uint16_t errUninitialized
const uint16_t stError
An error occurred that could potentially be retried.

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t stream,
MsgHandler *& incHandler )

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1140 of file XrdClStream.cc.

1142 {
1143 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1144 if( !mh.handler )
1146
1147 uint16_t action = mh.handler->InspectStatusRsp();
1148 mh.action |= action;
1149
1150 if( action & MsgHandler::RemoveHandler )
1151 pIncomingQueue->RemoveMessageHandler( mh.handler );
1152
1153 if( action & MsgHandler::Raw )
1154 {
1155 incHandler = mh.handler;
1156 return MsgHandler::Raw;
1157 }
1158
1159 if( action & MsgHandler::Corrupted )
1160 return MsgHandler::Corrupted;
1161
1162 if( action & MsgHandler::More )
1163 return MsgHandler::More;
1164
1165 return MsgHandler::None;
1166 }
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > & msg,
uint16_t stream )

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1119 of file XrdClStream.cc.

1120 {
1121 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1122 if( !mh.handler )
1123 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1124 mh.expires,
1125 mh.action );
1126
1127 if( !mh.handler )
1128 return nullptr;
1129
1130 if( mh.action & MsgHandler::Raw )
1131 return mh.handler;
1132 return nullptr;
1133 }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t subStream)

Call back when a message has been reconstructed.

Definition at line 610 of file XrdClStream.cc.

611 {
612 XrdSysMutexHelper scopedLock( pMutex );
613 pSubStreams[subStream]->status = Socket::Connected;
614
615 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
616 Log *log = DefaultEnv::GetLog();
617 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
618 subStream, ipstack.c_str() );
619
620 if( subStream == 0 )
621 {
622 pLastStreamError = 0;
623 pLastFatalError = XRootDStatus();
624 pConnectionCount = 0;
625 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
626 pSessionId = ++sSessCntGen;
627
628 //------------------------------------------------------------------------
629 // Create the streams if they don't exist yet
630 //------------------------------------------------------------------------
631 if( pSubStreams.size() == 1 && numSub > 1 )
632 {
633 for( uint16_t i = 1; i < numSub; ++i )
634 {
635 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
636 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
637 pChannelData, i, this );
638 pSubStreams.push_back( new SubStreamData() );
639 pSubStreams[i]->socket = s;
640 }
641 }
642
643 //------------------------------------------------------------------------
644 // Connect the extra streams, if we fail we move all the outgoing items
645 // to stream 0, we don't need to enable the uplink here, because it
646 // should be already enabled after the handshaking process is completed.
647 //------------------------------------------------------------------------
648 if( pSubStreams.size() > 1 )
649 {
650 log->Debug( PostMasterMsg, "[%s] Attempting to connect %d additional "
651 "streams.", pStreamName.c_str(), pSubStreams.size()-1 );
652 for( size_t i = 1; i < pSubStreams.size(); ++i )
653 {
654 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
655 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
656 if( !st.IsOK() )
657 {
658 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
659 pSubStreams[i]->socket->Close();
660 }
661 else
662 {
663 pSubStreams[i]->status = Socket::Connecting;
664 }
665 }
666 }
667
668 //------------------------------------------------------------------------
669 // Inform monitoring
670 //------------------------------------------------------------------------
671 pBytesSent = 0;
672 pBytesReceived = 0;
673 gettimeofday( &pConnectionDone, 0 );
674 Monitor *mon = DefaultEnv::GetMonitor();
675 if( mon )
676 {
677 Monitor::ConnectInfo i;
678 i.server = pUrl->GetHostId();
679 i.sTOD = pConnectionStarted;
680 i.eTOD = pConnectionDone;
681 i.streams = pSubStreams.size();
682
683 AnyObject qryResult;
684 std::string *qryResponse = 0;
685 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
686 qryResult.Get( qryResponse );
687 i.auth = *qryResponse;
688 delete qryResponse;
689 mon->Event( Monitor::EvConnect, &i );
690 }
691
692 //------------------------------------------------------------------------
693 // For every connected control-stream call the global on-connect handler
694 //------------------------------------------------------------------------
696 }
697 else if( pOnDataConnJob )
698 {
699 //------------------------------------------------------------------------
700 // For every connected data-stream call the on-connect handler
701 //------------------------------------------------------------------------
702 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
703 }
704 }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::TransportQuery::Auth, XrdCl::Monitor::ConnectInfo::auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::URL::GetHostId(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, and XrdCl::TransportHandler::SubStreamNumber().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t subStream,
XRootDStatus status )

On connect error.

Definition at line 709 of file XrdClStream.cc.

710 {
711 XrdSysMutexHelper scopedLock( pMutex );
712 Log *log = DefaultEnv::GetLog();
713 pSubStreams[subStream]->socket->Close();
714 time_t now = ::time(0);
715
716 //--------------------------------------------------------------------------
717 // For every connection error call the global connection error handler
718 //--------------------------------------------------------------------------
720
721 //--------------------------------------------------------------------------
722 // If we connected subStream == 0 and cannot connect >0 then we just give
723 // up and move the outgoing messages to another queue
724 //--------------------------------------------------------------------------
725 if( subStream > 0 )
726 {
727 pSubStreams[subStream]->status = Socket::Disconnected;
728 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
729 if( pSubStreams[0]->status == Socket::Connected )
730 {
731 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
732 if( !st.IsOK() )
733 OnFatalError( 0, st, scopedLock );
734 return;
735 }
736
737 if( pSubStreams[0]->status == Socket::Connecting )
738 return;
739
740 OnFatalError( subStream, status, scopedLock );
741 return;
742 }
743
744 //--------------------------------------------------------------------------
745 // Check if we still have time to try and do something in the current window
746 //--------------------------------------------------------------------------
747 time_t elapsed = now-pConnectionInitTime;
748 log->Error( PostMasterMsg, "[%s] elapsed = %d, pConnectionWindow = %d "
749 "seconds.", pStreamName.c_str(), elapsed, pConnectionWindow );
750
751 //------------------------------------------------------------------------
752 // If we have some IP addresses left we try them
753 //------------------------------------------------------------------------
754 if( !pAddresses.empty() )
755 {
756 XRootDStatus st;
757 do
758 {
759 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
760 pAddresses.pop_back();
761 pConnectionInitTime = ::time( 0 );
762 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
763 }
764 while( !pAddresses.empty() && !st.IsOK() );
765
766 if( !st.IsOK() )
767 OnFatalError( subStream, st, scopedLock );
768
769 return;
770 }
771 //------------------------------------------------------------------------
772 // If we still can retry with the same host name, we sleep until the end
773 // of the connection window and try
774 //------------------------------------------------------------------------
775 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
776 && !status.IsFatal() )
777 {
778 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %d "
779 "seconds.", pStreamName.c_str(), pConnectionWindow-elapsed );
780
781 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
782 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
783 return;
784 }
785 //--------------------------------------------------------------------------
786 // We are out of the connection window, the only thing we can do here
787 // is re-resolving the host name and retrying if we still can
788 //--------------------------------------------------------------------------
789 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
790 {
791 pAddresses.clear();
792 pSubStreams[0]->status = Socket::Disconnected;
793 PathID path( 0, 0 );
794 XRootDStatus st = EnableLink( path );
795 if( !st.IsOK() )
796 OnFatalError( subStream, st, scopedLock );
797 return;
798 }
799
800 //--------------------------------------------------------------------------
801 // Else, we fail
802 //--------------------------------------------------------------------------
803 OnFatalError( subStream, status, scopedLock );
804 }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), XrdCl::PostMasterMsg, and XrdCl::TaskManager::RegisterTask().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t subStream,
XRootDStatus status )

On error.

Definition at line 809 of file XrdClStream.cc.

810 {
811 XrdSysMutexHelper scopedLock( pMutex );
812 Log *log = DefaultEnv::GetLog();
813 pSubStreams[subStream]->socket->Close();
814 pSubStreams[subStream]->status = Socket::Disconnected;
815
816 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
817 pStreamName.c_str(), subStream, status.ToString().c_str() );
818
819 //--------------------------------------------------------------------------
820 // Reinsert the stuff that we have failed to sent
821 //--------------------------------------------------------------------------
822 if( pSubStreams[subStream]->outMsgHelper.msg )
823 {
824 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
825 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
826 h.stateful );
827 pSubStreams[subStream]->outMsgHelper.Reset();
828 }
829
830 //--------------------------------------------------------------------------
831 // Reinsert the receiving handler and reset any partially read partial
832 //--------------------------------------------------------------------------
833 if( pSubStreams[subStream]->inMsgHelper.handler )
834 {
835 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
836 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
837 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
838 if( xrdHandler ) xrdHandler->PartialReceived();
839 h.Reset();
840 }
841
842 //--------------------------------------------------------------------------
843 // We are dealing with an error of a peripheral stream. If we don't have
844 // anything to send don't bother recovering. Otherwise move the requests
845 // to stream 0 if possible.
846 //--------------------------------------------------------------------------
847 if( subStream > 0 )
848 {
849 if( pSubStreams[subStream]->outQueue->IsEmpty() )
850 return;
851
852 if( pSubStreams[0]->status != Socket::Disconnected )
853 {
854 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
855 if( pSubStreams[0]->status == Socket::Connected )
856 {
857 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
858 if( !st.IsOK() )
859 OnFatalError( 0, st, scopedLock );
860 return;
861 }
862 }
863 OnFatalError( subStream, status, scopedLock );
864 return;
865 }
866
867 //--------------------------------------------------------------------------
868 // If we lost the stream 0 we have lost the session, we re-enable the
869 // stream if we still have things in one of the outgoing queues, otherwise
870 // there is not point to recover at this point.
871 //--------------------------------------------------------------------------
872 if( subStream == 0 )
873 {
874 MonitorDisconnection( status );
875
876 SubStreamList::iterator it;
877 size_t outstanding = 0;
878 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
879 outstanding += (*it)->outQueue->GetSizeStateless();
880
881 if( outstanding )
882 {
883 PathID path( 0, 0 );
884 XRootDStatus st = EnableLink( path );
885 if( !st.IsOK() )
886 {
887 OnFatalError( 0, st, scopedLock );
888 return;
889 }
890 }
891
892 //------------------------------------------------------------------------
893 // We're done here, unlock the stream mutex to avoid deadlocks and
894 // report the disconnection event to the handlers
895 //------------------------------------------------------------------------
896 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
897 "message handlers.", pStreamName.c_str() );
898 OutQueue q;
899 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
900 q.GrabStateful( *(*it)->outQueue );
901 scopedLock.UnLock();
902
903 q.Report( status );
904 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
905 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
906 return;
907 }
908 }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::InMessageHelper::expires, XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::InMessageHelper::handler, XrdCl::OutQueue::MsgHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t subStream,
std::shared_ptr< Message > msg,
uint32_t bytesReceived )

Call back when a message has been reconstructed.

Definition at line 471 of file XrdClStream.cc.

474 {
475 msg->SetSessionId( pSessionId );
476 pBytesReceived += bytesReceived;
477
478 MsgHandler *handler = nullptr;
479 uint16_t action = 0;
480 {
481 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
482 handler = mh.handler;
483 action = mh.action;
484 mh.Reset();
485 }
486
487 if( !IsPartial( *msg ) )
488 {
489 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
490 *pChannelData );
491 if( streamAction & TransportHandler::DigestMsg )
492 return;
493
494 if( streamAction & TransportHandler::RequestClose )
495 {
496 RequestClose( *msg );
497 return;
498 }
499 }
500
501 Log *log = DefaultEnv::GetLog();
502
503 //--------------------------------------------------------------------------
504 // No handler, we discard the message ...
505 //--------------------------------------------------------------------------
506 if( !handler )
507 {
508 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
509 log->Warning( PostMasterMsg, "[%s] Discarding received message: 0x%x "
510 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
511 pStreamName.c_str(), msg.get(), rsp->hdr.status,
512 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
513 return;
514 }
515
516 //--------------------------------------------------------------------------
517 // We have a handler, so we call the callback
518 //--------------------------------------------------------------------------
519 log->Dump( PostMasterMsg, "[%s] Handling received message: 0x%x.",
520 pStreamName.c_str(), msg.get() );
521
523 {
524 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: 0x%x.",
525 pStreamName.c_str(), msg->GetDescription().c_str() );
526
527 // if we are handling partial response we have to take down the timeout fence
528 if( IsPartial( *msg ) )
529 {
530 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
531 if( xrdHandler ) xrdHandler->PartialReceived();
532 }
533
534 return;
535 }
536
537 Job *job = new HandleIncMsgJob( handler );
538 pJobManager->QueueJob( job );
539 }
kXR_char streamid[2]
Definition XProtocol.hh:912
ServerResponseHeader hdr
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t subStream,
Message * msg,
uint32_t bytesSent )

Definition at line 584 of file XrdClStream.cc.

587 {
588 pTransport->MessageSent( msg, subStream, bytesSent,
589 *pChannelData );
590 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
591 pBytesSent += bytesSent;
592 if( h.handler )
593 {
594 h.handler->OnStatusReady( msg, XRootDStatus() );
595 bool rmMsg = false;
596 pIncomingQueue->AddMessageHandler( h.handler, h.handler->GetExpiration(), rmMsg );
597 if( rmMsg )
598 {
599 Log *log = DefaultEnv::GetLog();
600 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
601 pStreamName.c_str(), subStream );
602 }
603 }
604 pSubStreams[subStream]->outMsgHelper.Reset();
605 }
void AddMessageHandler(MsgHandler *handler, time_t expires, bool &rmMsg)
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AddMessageHandler(), XrdCl::MsgHandler::GetExpiration(), XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), XrdCl::MsgHandler::OnStatusReady(), XrdCl::PostMasterMsg, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t subStream)

On read timeout.

Definition at line 1027 of file XrdClStream.cc.

1028 {
1029 //--------------------------------------------------------------------------
1030 // We only take the main stream into account
1031 //--------------------------------------------------------------------------
1032 if( substream != 0 )
1033 return true;
1034
1035 //--------------------------------------------------------------------------
1036 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1037 // It is assumed that the underlying transport makes sure that there is no
1038 // pending requests that are not answered, ie. all possible virtual streams
1039 // are de-allocated
1040 //--------------------------------------------------------------------------
1041 Log *log = DefaultEnv::GetLog();
1042 SubStreamList::iterator it;
1043 time_t now = time(0);
1044
1045 XrdSysMutexHelper scopedLock( pMutex );
1046 uint32_t outgoingMessages = 0;
1047 time_t lastActivity = 0;
1048 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1049 {
1050 outgoingMessages += (*it)->outQueue->GetSize();
1051 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1052 if( lastActivity < sockLastActivity )
1053 lastActivity = sockLastActivity;
1054 }
1055
1056 if( !outgoingMessages )
1057 {
1058 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1059 *pChannelData );
1060 if( disconnect )
1061 {
1062 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1063 pStreamName.c_str() );
1064 scopedLock.UnLock();
1065 //----------------------------------------------------------------------
1066 // Important note!
1067 //
1068 // This destroys the Stream object itself, the underlined
1069 // AsyncSocketHandler object (that called this method) and the Channel
1070 // object that aggregates this Stream.
1071 //----------------------------------------------------------------------
1073 return false;
1074 }
1075 }
1076
1077 //--------------------------------------------------------------------------
1078 // Check if the stream is broken
1079 //--------------------------------------------------------------------------
1080 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1081 *pChannelData );
1082 if( !st.IsOK() )
1083 {
1084 scopedLock.UnLock();
1085 OnError( substream, st );
1086 return false;
1087 }
1088 return true;
1089 }
Status ForceDisconnect(const URL &url)
Shut down a channel.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::PostMaster::ForceDisconnect(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnError(), XrdCl::PostMasterMsg, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t subStream)

Definition at line 545 of file XrdClStream.cc.

546 {
547 XrdSysMutexHelper scopedLock( pMutex );
548 Log *log = DefaultEnv::GetLog();
549 if( pSubStreams[subStream]->outQueue->IsEmpty() )
550 {
551 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
552 pSubStreams[subStream]->socket->GetStreamName().c_str() );
553
554 pSubStreams[subStream]->socket->DisableUplink();
555 return std::make_pair( (Message *)0, (MsgHandler *)0 );
556 }
557
558 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
559 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
560 h.expires,
561 h.stateful );
562 scopedLock.UnLock();
563 if( h.handler )
564 h.handler->OnReadyToSend( h.msg );
565 return std::make_pair( h.msg, h.handler );
566 }

References XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t subStream)

On write timeout.

Definition at line 1094 of file XrdClStream.cc.

1095 {
1096 return true;
1097 }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t query,
AnyObject & result )

Query the stream.

Definition at line 1216 of file XrdClStream.cc.

1217 {
1218 switch( query )
1219 {
1221 {
1222 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1223 return Status();
1224 }
1225
1227 {
1228 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1229 return Status();
1230 }
1231
1233 {
1234 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1235 return Status();
1236 }
1237
1238 default:
1239 return Status( stError, errQueryNotSupported );
1240 }
1241 }
const uint16_t errQueryNotSupported
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler * handler)

Register channel event handler.

Definition at line 1102 of file XrdClStream.cc.

1103 {
1104 pChannelEvHandlers.AddHandler( handler );
1105 }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler * handler)

Remove a channel event handler.

Definition at line 1110 of file XrdClStream.cc.

1111 {
1112 pChannelEvHandlers.RemoveHandler( handler );
1113 }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message * msg,
MsgHandler * handler,
bool stateful,
time_t expires )

Queue the message for sending.

Definition at line 297 of file XrdClStream.cc.

301 {
302 XrdSysMutexHelper scopedLock( pMutex );
303 Log *log = DefaultEnv::GetLog();
304
305 //--------------------------------------------------------------------------
306 // Check the session ID and bounce if needed
307 //--------------------------------------------------------------------------
308 if( msg->GetSessionId() &&
309 (pSubStreams[0]->status != Socket::Connected ||
310 pSessionId != msg->GetSessionId()) )
311 return XRootDStatus( stError, errInvalidSession );
312
313 //--------------------------------------------------------------------------
314 // Decide on the path to send the message
315 //--------------------------------------------------------------------------
316 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
317 if( pSubStreams.size() <= path.up )
318 {
319 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
320 "substream %d, using 0 instead", pStreamName.c_str(),
321 msg->GetDescription().c_str(), path.up );
322 path.up = 0;
323 }
324
325 log->Dump( PostMasterMsg, "[%s] Sending message %s (0x%x) through "
326 "substream %d expecting answer at %d", pStreamName.c_str(),
327 msg->GetDescription().c_str(), msg, path.up, path.down );
328
329 //--------------------------------------------------------------------------
330 // Enable *a* path and insert the message to the right queue
331 //--------------------------------------------------------------------------
332 XRootDStatus st = EnableLink( path );
333 if( st.IsOK() )
334 {
335 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
336 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
337 expires, stateful );
338 }
339 else
340 st.status = stFatal;
341 return st;
342 }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::Message::GetDescription(), XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject * channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116 {
117 pChannelData = channelData;
118 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue * incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108 {
109 pIncomingQueue = incomingQueue;
110 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager * jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132 {
133 pJobManager = jobManager;
134 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > & onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264 {
265 XrdSysMutexHelper scopedLock( pMutex );
266 pOnDataConnJob = onConnJob;
267 }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller * poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100 {
101 pPoller = poller;
102 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager * taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124 {
125 pTaskManager = taskManager;
126 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler * transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92 {
93 pTransport = transport;
94 }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 377 of file XrdClStream.cc.

378 {
379 //--------------------------------------------------------------------------
380 // Check for timed-out requests and incoming handlers
381 //--------------------------------------------------------------------------
382 pMutex.Lock();
383 OutQueue q;
384 SubStreamList::iterator it;
385 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
386 q.GrabExpired( *(*it)->outQueue, now );
387 pMutex.UnLock();
388
389 q.Report( XRootDStatus( stError, errOperationExpired ) );
390 pIncomingQueue->ReportTimeout( now );
391 }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: