13 #include <zypp-core/zyppng/base/AutoDisconnect> 14 #include <zypp-proto/core/envelope.pb.h> 19 : _data( new
zypp::proto::Envelope() )
23 : _data( new
zypp::proto::Envelope(
std::move(data) ) )
28 _data->set_messagetypename( std::move(name) );
33 return _data->messagetypename();
38 _data->set_value( std::move(name) );
43 return _data->value();
48 return _data->SerializeAsString();
89 zypp::proto::Envelope m;
90 if (! m.ParseFromArray( bytes.data(), bytes.size() ) ) {
91 ERR <<
"Received malformed message from peer" << std::endl;
118 if ( !_messages.size () ) {
122 _sigNextMessage.block ();
124 _sigNextMessage.unblock();
129 if ( !_messages.size () )
133 std::optional<RpcMessage> res;
135 if( msgName.empty() ) {
136 res = std::move( _messages.front () );
137 _messages.pop_front();
140 const auto i = std::find_if( _messages.begin(), _messages.end(), [&](
const RpcMessage &
env ) {
141 return env.messagetypename() == msgName;
144 if ( i != _messages.end() ) {
150 if ( _messages.size() )
151 _nextMessageTimer->start(0);
153 _nextMessageTimer->stop();
166 bool receivedInvalidMsg =
false;
168 receivedInvalidMsg =
true;
171 const bool hasMsgName = msgName.size();
172 while ( !receivedInvalidMsg &&
_ioDev->isOpen() &&
_ioDev->canRead() ) {
175 std::optional<RpcMessage> msg =
nextMessage(msgName);
176 if ( msg )
return msg;
183 if ( !
_ioDev->waitForReadyRead ( -1 ) ) {
193 if ( !_ioDev->canWrite () )
196 const auto &
str =
env._data->SerializeAsString();
199 _ioDev->write(
str.data(),
str.size() );
205 return _sigNextMessage;
216 while ( cont &&
_ioDev->bytesAvailable() ) {
225 zypp::proto::Envelope* rwcowClone<zypp::proto::Envelope>(
const zypp::proto::Envelope * rhs )
227 return new zypp::proto::Envelope(*rhs);
std::deque< RpcMessage > _messages
Signal< void()> _sigInvalidMessageReceived
Namespace intended to collect all environment variables we use.
zyppng::rpc::HeaderSizeType _pendingMessageSize
zypp::RWCOW_pointer< zypp::proto::Envelope > _data
std::string serialize() const
String related utilities and Regular expression matching.
SignalProxy< void()> sigInvalidMessageReceived()
virtual void serializeInto(std::string &str) const =0
SignalProxy< void()> sigMessageReceived()
bool sendMessage(const RpcMessage &env)
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
void set_value(std::string name)
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
void set_messagetypename(std::string name)
std::optional< RpcMessage > nextMessageWait(const std::string &msgName="")
The Timer class provides repetitive and single-shot timers.
const std::string & messagetypename() const
std::shared_ptr< IODevice > Ptr
Timer::Ptr _nextMessageTimer
const std::string & value() const
Base class for Exception.
std::optional< RpcMessage > nextMessage(const std::string &msgName="")
SignalProxy< void()> sigReadyRead()
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
RpcMessageStream(IODevice::Ptr iostr)
void timeout(const zyppng::Timer &)
Signal< void()> _sigNextMessage
InvalidMessageReceivedException(const std::string &msg={})
Easy-to use interface to the ZYPP dependency resolver.
virtual std::string serialize() const