133 #include "../core/stdafx.hpp"
135 #include "../core/OTLog.hpp"
137 #include <cppzmq/zmq.hpp>
153 : context_zmq(nullptr)
154 , socket_zmq(nullptr)
175 bool OTSocket_ZMQ_4::CloseSocket(
const bool bNewContext )
190 bool OTSocket_ZMQ_4::NewSocket(
const bool bIsRequest)
195 if (!CloseSocket())
return false;
200 bIsRequest ? ZMQ_REQ : ZMQ_REP);
202 catch (std::exception& e) {
203 OTLog::vError(
"%s: Exception Caught: %s \n", __FUNCTION__, e.what());
208 OTLog::vError(
"%s: Error: %s failed to be created!\n", __FUNCTION__,
209 "m_pzmq->socket_zmq");
213 const int linger = 0;
216 m_pzmq->
socket_zmq->setsockopt(ZMQ_LINGER, &linger,
sizeof(linger));
218 catch (std::exception& e) {
219 OTLog::vError(
"%s: Exception Caught: %s \n", __FUNCTION__, e.what());
235 if (!CloseSocket(
true))
return false;
245 catch (std::exception& e) {
246 OTLog::vError(
"%s: Exception Caught: %s \n", __FUNCTION__, e.what());
268 if (bConnected)
return Connect();
269 if (bListening)
return Listen();
284 OTLog::vError(
"%s: Error: %s must exist to Listen!\n", __FUNCTION__,
285 "m_pzmq->context_zmq");
289 OTLog::vError(
"%s: Error: Must not be Listening, to Connect!\n",
298 if (!NewSocket(
true))
return false;
303 catch (std::exception& e) {
304 OTLog::vError(
"%s: Exception Caught: %s \n", __FUNCTION__, e.what());
322 OTLog::vError(
"%s: Error: %s must exist to Listen!\n", __FUNCTION__,
323 "m_pzmq->context_zmq");
327 OTLog::vError(
"%s: Error: Must not be Connected, to Listen!\n",
336 if (!NewSocket(
false))
return false;
344 catch (std::exception& e) {
345 OTLog::vError(
"%s: Exception Caught: %s \n", __FUNCTION__, e.what());
355 if (!strConnectPath.
Exists()) {
373 if (!strBindingPath.
Exists()) {
396 OTLog::vError(
"%s: Error: %s is zero length!\n", __FUNCTION__,
406 OTLog::vError(
"%s: Error: %s must exist to Send!\n", __FUNCTION__,
407 "m_pzmq->context_zmq");
414 OTLog::vError(
"%s: Error: %s must exist to Send!\n", __FUNCTION__,
415 "m_pzmq->socket_zmq");
422 zmq::message_t zmq_message(ascEnvelope.
GetLength());
423 memcpy((
void*)zmq_message.data(), ascEnvelope.
Get(),
426 bool bSuccessSending =
false;
433 catch (std::exception& e) {
442 int64_t lDoubling = lLatencySendMilliSec;
443 bool bKeepTrying =
true;
445 while (bKeepTrying && (nSendTries > 0)) {
446 zmq::pollitem_t items[] = {
452 zmq::poll(&items[0], 1,
453 static_cast<long>(lDoubling));
457 catch (std::exception& e) {
465 if (items[0].revents & ZMQ_POLLOUT) {
471 catch (std::exception& e) {
479 if (!bSuccessSending) {
480 if (!HandleSendingError()) bKeepTrying =
false;
485 else if ((-1) == nPoll)
487 if (!HandlePollingError()) bKeepTrying =
false;
506 return bSuccessSending;
514 if (!bNewPath)
Connect(strConnectPath);
518 return Send(ascEnvelope);
530 OTLog::vError(
"%s: Error: %s must exist to Receive!\n", __FUNCTION__,
531 "m_pzmq->context_zmq");
538 OTLog::vError(
"%s: Error: %s must exist to Receive!\n", __FUNCTION__,
539 "m_pzmq->socket_zmq");
547 zmq::message_t zmq_message;
549 bool bSuccessReceiving =
false;
559 catch (std::exception& e) {
567 int64_t lDoubling = lLatencyRecvMilliSec;
569 bool expect_reply =
true;
570 while (expect_reply) {
572 zmq::pollitem_t items[] = {{*m_pzmq->
socket_zmq, 0, ZMQ_POLLIN, 0}};
576 nPoll = zmq::poll(&items[0], 1, static_cast<long>(lDoubling));
578 catch (std::exception& e) {
587 if (items[0].revents & ZMQ_POLLIN) {
593 catch (std::exception& e) {
600 if (!bSuccessReceiving) {
601 if (!HandleReceivingError()) expect_reply =
false;
606 else if (nReceiveTries == 0) {
608 expect_reply =
false;
611 else if ((-1) == nPoll)
613 if (!HandlePollingError()) expect_reply =
false;
620 if (bSuccessReceiving && (zmq_message.size() > 0))
621 strServerReply.
MemSet(static_cast<const char*>(zmq_message.data()),
622 static_cast<uint32_t>(zmq_message.size()));
624 return (bSuccessReceiving && (zmq_message.size() > 0));
627 bool OTSocket_ZMQ_4::HandlePollingError()
629 bool bRetVal =
false;
635 OTLog::Error(
"OTSocket::HandlePollingError: Failure: At least one of "
636 "the members of the items array refers to a socket whose "
637 "associated ØMQ context was terminated. (Deleting and "
638 "re-creating the context.)\n");
643 OTLog::Error(
"OTSocket::HandlePollingError: Failed: The provided "
644 "polling items were not valid (nullptr).\n");
649 OTLog::Error(
"OTSocket::HandlePollingError: The operation was "
650 "interrupted by delivery of a signal before any events "
651 "were available. Re-trying...\n");
656 "OTSocket::HandlePollingError: Default case. Re-trying...\n");
663 bool OTSocket_ZMQ_4::HandleSendingError()
665 bool bRetVal =
false;
671 OTLog::vOutput(0,
"OTSocket::HandleSendingError: Non-blocking mode was "
672 "requested and the message cannot be sent at the "
673 "moment. Re-trying...\n");
678 OTLog::Error(
"OTSocket::HandleSendingError: failure: The zmq_send() "
679 "operation is not supported by this socket type.\n");
688 "operation cannot be performed on this socket at the "
689 "moment due to the socket not being in the "
690 "appropriate state. Deleting socket and "
697 OTLog::Error(
"OTSocket::HandleSendingError: The ØMQ context associated "
698 "with the specified socket was terminated. (Deleting and "
699 "re-creating the context and the socket, and trying "
706 OTLog::Error(
"OTSocket::HandleSendingError: The provided socket was "
707 "invalid. (Deleting socket and re-trying...)\n");
714 OTLog::Error(
"OTSocket::HandleSendingError: The operation was "
715 "interrupted by delivery of a signal before the message "
716 "was sent. (Re-trying...)\n");
721 OTLog::Error(
"OTSocket::HandleSendingError: Failure: The provided "
722 "pollitems were not valid (nullptr).\n");
726 "OTSocket::HandleSendingError: Default case. Re-trying...\n");
733 bool OTSocket_ZMQ_4::HandleReceivingError()
735 bool bRetVal =
false;
741 OTLog::vOutput(0,
"OTSocket::HandleReceivingError: Non-blocking mode "
742 "was requested and no messages are available at the "
743 "moment. Re-trying...\n");
748 OTLog::Error(
"OTSocket::HandleReceivingError: Failure: The zmq_recv() "
749 "operation is not supported by this socket type.\n");
757 OTLog::vOutput(0,
"OTSocket::HandleReceivingError: The zmq_recv() "
758 "operation cannot be performed on this socket at the "
759 "moment due to the socket not being in the "
760 "appropriate state. (Deleting socket and "
765 bRetVal =
Send(ascTemp);
770 OTLog::Error(
"OTSocket::HandleReceivingError: The ØMQ context "
771 "associated with the specified socket was terminated. "
772 "(Re-creating the context, and trying again...)\n");
776 bRetVal =
Send(ascTemp);
781 OTLog::Error(
"OTSocket::HandleReceivingError: The provided socket was "
782 "invalid. (Deleting socket and re-trying.)\n");
786 bRetVal =
Send(ascTemp);
792 OTLog::Error(
"OTSocket::HandleSendingError: The operation was "
793 "interrupted by delivery of a signal before the message "
794 "was sent. (Re-trying...)\n");
799 OTLog::Error(
"OTSocket::HandleReceivingError: Failure: The message "
800 "passed to the function was invalid.\n");
804 "OTSocket::HandleReceivingError: Default case. Re-trying...\n");
static EXPORT void vError(const char *szError,...)
EXPORT bool RemakeSocket(const bool bNewContext=false)
int64_t m_lLatencyDelayAfter
OTString m_strConnectPath
int32_t m_nLatencyReceiveNoTries
int32_t m_nLatencySendNoTries
zmq::context_t * context_zmq
zmq::socket_t * socket_zmq
OTString m_strBindingPath
EXPORT uint32_t GetLength() const
EXPORT bool Send(const OTASCIIArmor &ascEnvelope)
EXPORT bool Exists() const
static EXPORT void Error(const char *szError)
EXPORT bool Compare(const char *compare) const
EXPORT void Set(const char *data, uint32_t enforcedMaxLength=0)
int64_t m_lLatencyReceiveMs
static EXPORT bool SleepMilliseconds(int64_t lMilliseconds)
OTASCIIArmor m_ascLastMsgSent
EXPORT const char * Get() const
EXPORT bool Receive(OTString &strServerReply)
EXPORT bool MemSet(const char *mem, uint32_t size)
static EXPORT void vOutput(int32_t nVerbosity, const char *szOutput,...)