libzypp 17.34.1
messagestream.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8----------------------------------------------------------------------*/
9
10#include "messagestream.h"
11
13#include <zypp-core/zyppng/base/AutoDisconnect>
14#include <zypp-proto/core/envelope.pb.h>
15
16namespace zyppng {
17
19 : _data( new zypp::proto::Envelope() )
20 { }
21
22 RpcMessage::RpcMessage( zypp::proto::Envelope data )
23 : _data( new zypp::proto::Envelope( std::move(data) ) )
24 { }
25
26 void RpcMessage::set_messagetypename(std::string name)
27 {
28 _data->set_messagetypename( std::move(name) );
29 }
30
31 const std::string &RpcMessage::messagetypename() const
32 {
33 return _data->messagetypename();
34 }
35
36 void RpcMessage::set_value(std::string name)
37 {
38 _data->set_value( std::move(name) );
39 }
40
41 const std::string &RpcMessage::value() const
42 {
43 return _data->value();
44 }
45
46 std::string RpcMessage::serialize() const
47 {
48 return _data->SerializeAsString();
49 }
50
51 std::string RpcBaseType::serialize() const
52 {
53 std::string target;
54 serializeInto ( target );
55 return target;
56 }
57
58
60 : zypp::Exception( zypp::str::Str() << "Invalid Message received: (" << msg <<")" )
61 { }
62
63
65 {
67 _nextMessageTimer->setSingleShot(false);
68
70 if ( _ioDev->isOpen () && _ioDev->canRead () )
72 }
73
75 {
76 if ( _pendingMessageSize == 0 ) {
77 if ( _ioDev->bytesAvailable() >= sizeof( rpc::HeaderSizeType ) ) {
78 _ioDev->read( reinterpret_cast<char *>( &_pendingMessageSize ), sizeof( rpc::HeaderSizeType ) );
79 }
80 }
81
82 if ( _ioDev->bytesAvailable() < _pendingMessageSize ) {
83 return false;
84 }
85
86 auto bytes = _ioDev->read( _pendingMessageSize );
88
89 zypp::proto::Envelope m;
90 if (! m.ParseFromArray( bytes.data(), bytes.size() ) ) {
91 ERR << "Received malformed message from peer" << std::endl;
93 return false;
94 }
95
96 _messages.emplace_back( std::move(m) );
97 _sigNextMessage.emit ();
98
99 if ( _messages.size() ) {
100 // nag the user code until all messages have been used up
101 _nextMessageTimer->start(0);
102 }
103
104 return true;
105 }
106
108 {
109 if ( _messages.size() )
110 _sigNextMessage.emit();
111
112 if ( !_messages.size() )
113 _nextMessageTimer->stop();
114 }
115
116 std::optional<RpcMessage> zyppng::RpcMessageStream::nextMessage( const std::string &msgName )
117 {
118 if ( !_messages.size () ) {
119
120 // try to read the next messages from the fd
121 {
122 _sigNextMessage.block ();
123 zypp::OnScopeExit unblock([&](){
124 _sigNextMessage.unblock();
125 });
126 readAllMessages();
127 }
128
129 if ( !_messages.size () )
130 return {};
131 }
132
133 std::optional<RpcMessage> res;
134
135 if( msgName.empty() ) {
136 res = std::move( _messages.front () );
137 _messages.pop_front();
138
139 } else {
140 const auto i = std::find_if( _messages.begin(), _messages.end(), [&]( const RpcMessage &env ) {
141 return env.messagetypename() == msgName;
142 });
143
144 if ( i != _messages.end() ) {
145 res = std::move(*i);
146 _messages.erase(i);
147 }
148 }
149
150 if ( _messages.size() )
151 _nextMessageTimer->start(0);
152 else
153 _nextMessageTimer->stop();
154
155 return res;
156 }
157
158 std::optional<RpcMessage> RpcMessageStream::nextMessageWait( const std::string &msgName )
159 {
160 // make sure the signal is not emitted until we have the next message
161 _sigNextMessage.block ();
162 zypp::OnScopeExit unblock([&](){
163 _sigNextMessage.unblock();
164 });
165
166 bool receivedInvalidMsg = false;
168 receivedInvalidMsg = true;
169 }));
170
171 const bool hasMsgName = msgName.size();
172 while ( !receivedInvalidMsg && _ioDev->isOpen() && _ioDev->canRead() ) {
173 if ( _messages.size() ) {
174 if ( hasMsgName ) {
175 std::optional<RpcMessage> msg = nextMessage(msgName);
176 if ( msg ) return msg;
177 }
178 else {
179 break;
180 }
181 }
182
183 if ( !_ioDev->waitForReadyRead ( -1 ) ) {
184 // this can only mean that a error happened, like device was closed
185 return {};
186 }
187 }
188 return nextMessage (msgName);
189 }
190
192 {
193 if ( !_ioDev->canWrite () )
194 return false;
195
196 const auto &str = env._data->SerializeAsString();
197 rpc::HeaderSizeType msgSize = str.length();
198 _ioDev->write( (char *)(&msgSize), sizeof( rpc::HeaderSizeType ) );
199 _ioDev->write( str.data(), str.size() );
200 return true;
201 }
202
204 {
205 return _sigNextMessage;
206 }
207
212
214 {
215 bool cont = true;
216 while ( cont && _ioDev->bytesAvailable() ) {
217 cont = readNextMessage ();
218 }
219 }
220
221}
222
223namespace zypp {
224 template<>
225 zypp::proto::Envelope* rwcowClone<zypp::proto::Envelope>( const zypp::proto::Envelope * rhs )
226 {
227 return new zypp::proto::Envelope(*rhs);
228 }
229}
Base class for Exception.
Definition Exception.h:147
static auto connectFunc(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, ReceiverFunc &&rFunc, const Tracker &...trackers)
Definition base.h:163
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition base.h:142
SignalProxy< void()> sigReadyRead()
Definition iodevice.cc:324
std::shared_ptr< IODevice > Ptr
Definition iodevice.h:44
InvalidMessageReceivedException(const std::string &msg={})
virtual std::string serialize() const
virtual void serializeInto(std::string &str) const =0
Signal< void()> _sigInvalidMessageReceived
SignalProxy< void()> sigMessageReceived()
Signal< void()> _sigNextMessage
zyppng::rpc::HeaderSizeType _pendingMessageSize
std::optional< RpcMessage > nextMessage(const std::string &msgName="")
RpcMessageStream(IODevice::Ptr iostr)
std::deque< RpcMessage > _messages
bool sendMessage(const RpcMessage &env)
SignalProxy< void()> sigInvalidMessageReceived()
void timeout(const zyppng::Timer &)
std::optional< RpcMessage > nextMessageWait(const std::string &msgName="")
zypp::RWCOW_pointer< zypp::proto::Envelope > _data
void set_value(std::string name)
void set_messagetypename(std::string name)
std::string serialize() const
const std::string & value() const
const std::string & messagetypename() const
The Timer class provides repetitive and single-shot timers.
Definition timer.h:45
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition timer.cc:120
Definition Arch.h:364
String related utilities and Regular expression matching.
Namespace intended to collect all environment variables we use.
Definition Env.h:23
Easy-to use interface to the ZYPP dependency resolver.
zypp::proto::Envelope * rwcowClone< zypp::proto::Envelope >(const zypp::proto::Envelope *rhs)
uint32_t HeaderSizeType
Definition rpc.h:17
#define ERR
Definition Logger.h:100