Open-Transactions  0.93.0-ge03d287
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
opentxs::OTSocket_ZMQ_4 Class Reference

#include <Socket_ZMQ4.hpp>

Inheritance diagram for opentxs::OTSocket_ZMQ_4:
Collaboration diagram for opentxs::OTSocket_ZMQ_4:

Classes

class  ZMQ4
 

Public Member Functions

EXPORT OTSocket_ZMQ_4 ()
 
EXPORT ~OTSocket_ZMQ_4 ()
 
EXPORT bool NewContext ()
 
EXPORT bool RemakeSocket (const bool bNewContext=false)
 
EXPORT bool Connect ()
 
EXPORT bool Listen ()
 
EXPORT bool Connect (const OTString &strConnectPath)
 
EXPORT bool Listen (const OTString &strBindingPath)
 
EXPORT bool Send (const OTASCIIArmor &ascEnvelope)
 
EXPORT bool Send (const OTASCIIArmor &ascEnvelope, const OTString &strConnectPath)
 
EXPORT bool Receive (OTString &strServerReply)
 
- Public Member Functions inherited from opentxs::OTSocket
virtual ~OTSocket ()
 
EXPORT std::mutex * GetMutex ()
 
EXPORT bool Init (const Defaults &defaults)
 
EXPORT bool Init (const Defaults &defaults, OTSettings *pSettings)
 
EXPORT bool IsInitialized () const
 
EXPORT bool HasContext () const
 
EXPORT bool IsConnected () const
 
EXPORT bool IsListening () const
 
EXPORT const OTStringGetConnectPath () const
 
EXPORT const OTStringGetBindingPath () const
 

Additional Inherited Members

- Protected Member Functions inherited from opentxs::OTSocket
 OTSocket ()
 
- Protected Attributes inherited from opentxs::OTSocket
int64_t m_lLatencySendMs
 
int32_t m_nLatencySendNoTries
 
int64_t m_lLatencyReceiveMs
 
int32_t m_nLatencyReceiveNoTries
 
int64_t m_lLatencyDelayAfter
 
bool m_bIsBlocking
 
bool m_bInitialized
 
bool m_HasContext
 
bool m_bConnected
 
bool m_bListening
 
OTString m_strConnectPath
 
OTString m_strBindingPath
 
OTASCIIArmor m_ascLastMsgSent
 

Detailed Description

Definition at line 141 of file Socket_ZMQ4.hpp.

Constructor & Destructor Documentation

opentxs::OTSocket_ZMQ_4::OTSocket_ZMQ_4 ( )

Definition at line 164 of file Socket_ZMQ4.cpp.

165  : m_pzmq(new ZMQ4())
166 {
167 }
opentxs::OTSocket_ZMQ_4::~OTSocket_ZMQ_4 ( )

Definition at line 169 of file Socket_ZMQ4.cpp.

170 {
171  CloseSocket();
172  delete (m_pzmq);
173 }

Member Function Documentation

bool opentxs::OTSocket_ZMQ_4::Connect ( )
virtual

Implements opentxs::OTSocket.

Definition at line 274 of file Socket_ZMQ4.cpp.

275 {
276  if (!m_bInitialized) {
277  OT_FAIL;
278  }
279  if (!m_HasContext) {
280  OT_FAIL;
281  }
282 
283  if (nullptr == m_pzmq->context_zmq) {
284  OTLog::vError("%s: Error: %s must exist to Listen!\n", __FUNCTION__,
285  "m_pzmq->context_zmq");
286  OT_FAIL;
287  }
288  if (true == m_bListening) {
289  OTLog::vError("%s: Error: Must not be Listening, to Connect!\n",
290  __FUNCTION__);
291  OT_FAIL;
292  }
293 
294  if (!m_strConnectPath.Exists()) {
295  OT_FAIL;
296  }
297 
298  if (!NewSocket(true)) return false; // NewSocket(true), Request Socket.
299 
300  try {
301  m_pzmq->socket_zmq->connect(m_strConnectPath.Get());
302  }
303  catch (std::exception& e) {
304  OTLog::vError("%s: Exception Caught: %s \n", __FUNCTION__, e.what());
305  OT_FAIL;
306  }
307 
308  m_bConnected = true;
309  return true;
310 }
static EXPORT void vError(const char *szError,...)
Definition: OTLog.cpp:800
OTString m_strConnectPath
Definition: OTSocket.hpp:198
EXPORT bool Exists() const
Definition: OTString.cpp:1035
#define OT_FAIL
Definition: Assert.hpp:139
EXPORT const char * Get() const
Definition: OTString.cpp:1045
bool opentxs::OTSocket_ZMQ_4::Connect ( const OTString strConnectPath)
virtual

Implements opentxs::OTSocket.

Definition at line 353 of file Socket_ZMQ4.cpp.

354 {
355  if (!strConnectPath.Exists()) {
356  OTLog::vError("%s: Error: %s dosn't exist!\n", __FUNCTION__,
357  "strConnectPath");
358  OT_FAIL;
359  }
360  if (5 > strConnectPath.GetLength()) {
361  OTLog::vError("%s: Error: %s is too short!\n", __FUNCTION__,
362  "strConnectPath");
363  OT_FAIL;
364  }
365 
366  m_strConnectPath = strConnectPath; // set the connection path.
367 
368  return (Connect());
369 }
static EXPORT void vError(const char *szError,...)
Definition: OTLog.cpp:800
OTString m_strConnectPath
Definition: OTSocket.hpp:198
#define OT_FAIL
Definition: Assert.hpp:139
bool opentxs::OTSocket_ZMQ_4::Listen ( )
virtual

Implements opentxs::OTSocket.

Definition at line 312 of file Socket_ZMQ4.cpp.

313 {
314  if (!m_bInitialized) {
315  OT_FAIL;
316  }
317  if (!m_HasContext) {
318  OT_FAIL;
319  }
320 
321  if (nullptr == m_pzmq->context_zmq) {
322  OTLog::vError("%s: Error: %s must exist to Listen!\n", __FUNCTION__,
323  "m_pzmq->context_zmq");
324  OT_FAIL;
325  }
326  if (true == m_bConnected) {
327  OTLog::vError("%s: Error: Must not be Connected, to Listen!\n",
328  __FUNCTION__);
329  OT_FAIL;
330  }
331 
332  if (!m_strBindingPath.Exists()) {
333  OT_FAIL;
334  }
335 
336  if (!NewSocket(false)) return false; // NewSocket(false), Responce Socket.
337 
338  try {
339  m_pzmq->socket_zmq->bind(m_strBindingPath.Get()); // since
340  // m_strBindingPath
341  // was checked and set
342  // above.
343  }
344  catch (std::exception& e) {
345  OTLog::vError("%s: Exception Caught: %s \n", __FUNCTION__, e.what());
346  OT_FAIL;
347  }
348 
349  m_bListening = true;
350  return true;
351 }
static EXPORT void vError(const char *szError,...)
Definition: OTLog.cpp:800
OTString m_strBindingPath
Definition: OTSocket.hpp:199
EXPORT bool Exists() const
Definition: OTString.cpp:1035
#define OT_FAIL
Definition: Assert.hpp:139
EXPORT const char * Get() const
Definition: OTString.cpp:1045
bool opentxs::OTSocket_ZMQ_4::Listen ( const OTString strBindingPath)
virtual

Implements opentxs::OTSocket.

Definition at line 371 of file Socket_ZMQ4.cpp.

372 {
373  if (!strBindingPath.Exists()) {
374  OTLog::vError("%s: Error: %s dosn't exist!\n", __FUNCTION__,
375  "strBindingPath");
376  OT_FAIL;
377  }
378  if (5 > strBindingPath.GetLength()) {
379  OTLog::vError("%s: Error: %s is too short!\n", __FUNCTION__,
380  "strBindingPath");
381  OT_FAIL;
382  }
383 
384  m_strBindingPath = strBindingPath;
385 
386  return (Listen());
387 }
static EXPORT void vError(const char *szError,...)
Definition: OTLog.cpp:800
OTString m_strBindingPath
Definition: OTSocket.hpp:199
#define OT_FAIL
Definition: Assert.hpp:139
bool opentxs::OTSocket_ZMQ_4::NewContext ( )
virtual

Implements opentxs::OTSocket.

Definition at line 229 of file Socket_ZMQ4.cpp.

230 {
231  if (!m_bInitialized) return false;
232 
233  m_HasContext = false;
234 
235  if (!CloseSocket(true)) return false;
236 
237  if (nullptr != m_pzmq->context_zmq) zmq_term(m_pzmq->context_zmq);
238  if (nullptr != m_pzmq->context_zmq) delete m_pzmq->context_zmq;
239  m_pzmq->context_zmq = nullptr;
240 
241  try {
242  m_pzmq->context_zmq = new zmq::context_t(
243  1, 31); // Threads, Max Sockets. (31 is a sane default).
244  }
245  catch (std::exception& e) {
246  OTLog::vError("%s: Exception Caught: %s \n", __FUNCTION__, e.what());
247  OT_FAIL;
248  }
249 
250  m_HasContext = true;
251  return true;
252 }
static EXPORT void vError(const char *szError,...)
Definition: OTLog.cpp:800
#define OT_FAIL
Definition: Assert.hpp:139
bool opentxs::OTSocket_ZMQ_4::Receive ( OTString strServerReply)
virtual

Implements opentxs::OTSocket.

Definition at line 521 of file Socket_ZMQ4.cpp.

522 {
523  if (!m_bInitialized) {
524  OT_FAIL;
525  }
526  if (!m_HasContext) {
527  OT_FAIL;
528  }
529  if (nullptr == m_pzmq->context_zmq) {
530  OTLog::vError("%s: Error: %s must exist to Receive!\n", __FUNCTION__,
531  "m_pzmq->context_zmq");
532  OT_FAIL;
533  }
534 
535  if (!m_bConnected && !m_bListening) return false;
536  if (m_bConnected && m_bListening) return false;
537  if (nullptr == m_pzmq->socket_zmq) {
538  OTLog::vError("%s: Error: %s must exist to Receive!\n", __FUNCTION__,
539  "m_pzmq->socket_zmq");
540  OT_FAIL;
541  }
542 
543  // -----------------------------------
544  const int64_t lLatencyRecvMilliSec = m_lLatencyReceiveMs;
545 
546  //  Get the reply.
547  zmq::message_t zmq_message;
548 
549  bool bSuccessReceiving = false;
550 
551  // If failure receiving, re-tries 2 times, with 4000 ms max delay between
552  // each (Doubling every time.)
553  //
554  if (m_bIsBlocking) {
555  try {
556  bSuccessReceiving =
557  m_pzmq->socket_zmq->recv(&zmq_message); // Blocking.
558  }
559  catch (std::exception& e) {
560  OTLog::vError("%s: Exception Caught: %s \n", __FUNCTION__,
561  e.what());
562  OT_FAIL;
563  }
564  }
565  else // not blocking
566  {
567  int64_t lDoubling = lLatencyRecvMilliSec;
568  int32_t nReceiveTries = m_nLatencyReceiveNoTries;
569  bool expect_reply = true;
570  while (expect_reply) {
571  // Poll socket for a reply, with timeout
572  zmq::pollitem_t items[] = {{*m_pzmq->socket_zmq, 0, ZMQ_POLLIN, 0}};
573 
574  int nPoll = 0;
575  try {
576  nPoll = zmq::poll(&items[0], 1, static_cast<long>(lDoubling));
577  }
578  catch (std::exception& e) {
579  OTLog::vError("%s: Exception Caught: %s \n", __FUNCTION__,
580  e.what());
581  OT_FAIL;
582  }
583 
584  lDoubling *= 2;
585 
586  // If we got a reply, process it
587  if (items[0].revents & ZMQ_POLLIN) {
588  try {
589  bSuccessReceiving = m_pzmq->socket_zmq->recv(
590  &zmq_message,
591  ZMQ_NOBLOCK); // <=========== RECEIVE ===============
592  }
593  catch (std::exception& e) {
594  OTLog::vError("%s: Exception Caught: %s \n", __FUNCTION__,
595  e.what());
596  OT_FAIL;
597  }
599 
600  if (!bSuccessReceiving) {
601  if (!HandleReceivingError()) expect_reply = false;
602  }
603  else
604  break; // (Success -- we're done in this loop.)
605  }
606  else if (nReceiveTries == 0) {
607  // OTLog::Error("OTSocket::Receive: no message.\n");
608  expect_reply = false;
609  break;
610  }
611  else if ((-1) == nPoll) // error.
612  {
613  if (!HandlePollingError()) expect_reply = false;
614  }
615 
616  --nReceiveTries;
617  }
618  }
619 
620  if (bSuccessReceiving && (zmq_message.size() > 0))
621  strServerReply.MemSet(static_cast<const char*>(zmq_message.data()),
622  static_cast<uint32_t>(zmq_message.size()));
623 
624  return (bSuccessReceiving && (zmq_message.size() > 0));
625 }
static EXPORT void vError(const char *szError,...)
Definition: OTLog.cpp:800
int32_t m_nLatencyReceiveNoTries
Definition: OTSocket.hpp:189
int64_t m_lLatencyReceiveMs
Definition: OTSocket.hpp:188
static EXPORT bool SleepMilliseconds(int64_t lMilliseconds)
Definition: OTLog.cpp:651
#define OT_FAIL
Definition: Assert.hpp:139
bool opentxs::OTSocket_ZMQ_4::RemakeSocket ( const bool  bNewContext = false)
virtual

Implements opentxs::OTSocket.

Definition at line 254 of file Socket_ZMQ4.cpp.

255 {
256 
257  if (!m_bInitialized) return false;
258  if (!m_HasContext) return false;
259 
260  if (!m_bConnected || !m_bListening) return false;
261  if (m_bConnected && m_bListening) return false;
262 
263  bool bConnected = m_bConnected;
264  bool bListening = m_bListening;
265 
266  if (bNewContext) NewContext();
267 
268  if (bConnected) return Connect();
269  if (bListening) return Listen();
270 
271  return false;
272 }
EXPORT bool NewContext()
bool opentxs::OTSocket_ZMQ_4::Send ( const OTASCIIArmor ascEnvelope)
virtual

Implements opentxs::OTSocket.

Definition at line 389 of file Socket_ZMQ4.cpp.

390 {
391  if (!m_bInitialized) {
392  OT_FAIL;
393  }
394 
395  if (0 >= ascEnvelope.GetLength()) {
396  OTLog::vError("%s: Error: %s is zero length!\n", __FUNCTION__,
397  "ascEnvelope");
398  OT_FAIL;
399  }
400  m_ascLastMsgSent.Set(ascEnvelope); // In case we need to re-send.
401 
402  if (!m_HasContext) {
403  OT_FAIL;
404  }
405  if (nullptr == m_pzmq->context_zmq) {
406  OTLog::vError("%s: Error: %s must exist to Send!\n", __FUNCTION__,
407  "m_pzmq->context_zmq");
408  OT_FAIL;
409  }
410 
411  if (!m_bConnected && !m_bListening) return false;
412  if (m_bConnected && m_bListening) return false;
413  if (nullptr == m_pzmq->socket_zmq) {
414  OTLog::vError("%s: Error: %s must exist to Send!\n", __FUNCTION__,
415  "m_pzmq->socket_zmq");
416  OT_FAIL;
417  }
418 
419  // -----------------------------------
420  const int64_t lLatencySendMilliSec = m_lLatencySendMs;
421 
422  zmq::message_t zmq_message(ascEnvelope.GetLength());
423  memcpy((void*)zmq_message.data(), ascEnvelope.Get(),
424  ascEnvelope.GetLength());
425 
426  bool bSuccessSending = false;
427 
428  if (m_bIsBlocking) {
429  try {
430  bSuccessSending =
431  m_pzmq->socket_zmq->send(zmq_message); // Blocking.
432  }
433  catch (std::exception& e) {
434  OTLog::vError("%s: Exception Caught: %s \n", __FUNCTION__,
435  e.what());
436  OT_FAIL;
437  }
438  }
439  else // not blocking
440  {
441  int32_t nSendTries = m_nLatencySendNoTries;
442  int64_t lDoubling = lLatencySendMilliSec;
443  bool bKeepTrying = true;
444 
445  while (bKeepTrying && (nSendTries > 0)) {
446  zmq::pollitem_t items[] = {
447  {(*m_pzmq->socket_zmq), 0, ZMQ_POLLOUT, 0}};
448 
449  int nPoll = 0;
450  try {
451  nPoll =
452  zmq::poll(&items[0], 1,
453  static_cast<long>(lDoubling)); // ZMQ_POLLOUT, 1
454  // item, timeout
455  // (milliseconds)
456  }
457  catch (std::exception& e) {
458  OTLog::vError("%s: Exception Caught: %s \n", __FUNCTION__,
459  e.what());
460  OT_FAIL;
461  }
462 
463  lDoubling *= 2;
464 
465  if (items[0].revents & ZMQ_POLLOUT) {
466  try {
467  bSuccessSending = m_pzmq->socket_zmq->send(
468  zmq_message,
469  ZMQ_NOBLOCK); // <=========== SEND ===============
470  }
471  catch (std::exception& e) {
472  OTLog::vError("%s: Exception Caught: %s \n", __FUNCTION__,
473  e.what());
474  OT_FAIL;
475  }
476 
478 
479  if (!bSuccessSending) {
480  if (!HandleSendingError()) bKeepTrying = false;
481  }
482  else
483  break; // (Success -- we're done in this loop.)
484  }
485  else if ((-1) == nPoll) // error.
486  {
487  if (!HandlePollingError()) bKeepTrying = false;
488  }
489 
490  --nSendTries;
491  }
492  }
493  /*
494  Normally, we try to send...
495  If the send fails, we wait X ms and then try again (Y times).
496 
497  BUT -- what if the failure was an errno==EAGAIN ?
498  In that case, it's not a REAL failure, but rather, a "failure right now, try
499  again in a sec."
500  */
501 
502  if (bSuccessSending)
504  : 1);
505 
506  return bSuccessSending;
507 }
static EXPORT void vError(const char *szError,...)
Definition: OTLog.cpp:800
int64_t m_lLatencyDelayAfter
Definition: OTSocket.hpp:190
int32_t m_nLatencySendNoTries
Definition: OTSocket.hpp:187
int64_t m_lLatencySendMs
Definition: OTSocket.hpp:186
EXPORT void Set(const char *data, uint32_t enforcedMaxLength=0)
Definition: OTString.cpp:1055
static EXPORT bool SleepMilliseconds(int64_t lMilliseconds)
Definition: OTLog.cpp:651
OTASCIIArmor m_ascLastMsgSent
Definition: OTSocket.hpp:201
#define OT_FAIL
Definition: Assert.hpp:139
bool opentxs::OTSocket_ZMQ_4::Send ( const OTASCIIArmor ascEnvelope,
const OTString strConnectPath 
)
virtual

Implements opentxs::OTSocket.

Definition at line 509 of file Socket_ZMQ4.cpp.

511 {
512  const bool bNewPath = m_strConnectPath.Compare(strConnectPath);
513 
514  if (!bNewPath) Connect(strConnectPath);
515 
516  if (!m_bConnected) OT_FAIL;
517 
518  return Send(ascEnvelope);
519 }
OTString m_strConnectPath
Definition: OTSocket.hpp:198
EXPORT bool Send(const OTASCIIArmor &ascEnvelope)
EXPORT bool Compare(const char *compare) const
Definition: OTString.cpp:1102
#define OT_FAIL
Definition: Assert.hpp:139

The documentation for this class was generated from the following files: