Open-Transactions  0.93.0-ge03d287
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
opentxs::MessageProcessor Class Reference

#include <MessageProcessor.hpp>

Public Member Functions

EXPORT MessageProcessor (ServerLoader &loader)
 
EXPORT void run ()
 

Detailed Description

Definition at line 145 of file MessageProcessor.hpp.

Constructor & Destructor Documentation

opentxs::MessageProcessor::MessageProcessor ( ServerLoader loader)
explicit

Definition at line 164 of file MessageProcessor.cpp.

165  : server_(loader.getServer())
166  , socket_()
167 {
168  init(loader.getPort());
169 }

Member Function Documentation

void opentxs::MessageProcessor::run ( )

Definition at line 214 of file MessageProcessor.cpp.

215 {
216  for (;;) {
217  // =-=-=- HEARTBEAT -=-=-=
218  //
219  // The Server now processes certain things on a regular basis.
220  // ProcessCron is what gives it the opportunity to do that.
221  // All of the Cron Items (including market trades, payment plans, smart
222  // contracts...)
223  // they all have their hooks here...
224  //
225  // Internally this is smart enough to know how often to actually
226  // activate itself.
227  server_->ProcessCron();
228  // Most often it just returns doing nothing (waiting for its timer.)
229  // Wait for client http requests (and process replies out to them.)
230  // Number of requests to process per heartbeat:
231  // ServerSettings::GetHeartbeatNoRequests()
232  //
233  // Loop: process up to 10 client requests, then sleep for 1/10th second.
234  // That's a total of 100 requests per second. Can the computers handle
235  // it?
236  // Is it too much or too little? Todo: load testing.
237  //
238  // Then: check for shutdown flag.
239  //
240  // Then: go back to the top ("do") and repeat the loop.... process cron,
241  // process 10 client requests, sleep, check for shutdown, etc.
242 
243  Timer t;
244  t.start();
245  double startTick = t.getElapsedTimeInMilliSec();
246 
247  // PROCESS X NUMBER OF REQUESTS (THIS PULSE.)
248  //
249  // Theoretically the "number of requests" that we process EACH PULSE.
250  // (The timing code here is still pretty new, need to do some load
251  // testing.)
252  for (int i = 0; i < ServerSettings::GetHeartbeatNoRequests(); i++) {
253  OTString messageString;
254 
255  // With 100ms heartbeat, receive will try 100 ms, then 200 ms, then
256  // 400 ms, total of 700.
257  // That's about 15 Receive() calls every 10 seconds. Therefore if I
258  // want the ProcessCron()
259  // to trigger every 10 seconds, I need to set the cron interval to
260  // roll over every 15 heartbeats.
261  // Therefore I will be using a real Timer for Cron, instead of the
262  // damn intervals.
263  bool received = socket_.Receive(messageString);
264 
265  if (received) {
266  if (messageString.GetLength() <= 0) {
267  OTLog::Error("server main: Received a message, but of 0 "
268  "length or less. Weird. (Skipping it.)\n");
269  }
270  else {
271  std::string strMsg(messageString.Get());
272  std::string reply;
273  bool shouldDisconnect = processMessage(strMsg, reply);
274 
275  if (reply.length() <= 0 || shouldDisconnect) {
277  0, "server main: ERROR: Unfortunately, not every "
278  "client request is "
279  "legible or worthy of a server response. :-) "
280  "Msg:\n\n%s\n\n",
281  strMsg.c_str());
282 
283  socket_.Listen();
284  }
285  else {
286  bool successSending = socket_.Send(reply.c_str());
287 
288  if (!successSending) {
289  OTLog::vError("server main: Socket ERROR: failed "
290  "while trying to send reply "
291  "back to client! \n\n "
292  "MESSAGE:\n%s\n\nREPLY:\n%s\n\n",
293  strMsg.c_str(), reply.c_str());
294  }
295  }
296  }
297  }
298  }
299 
300  // IF the time we had available wasn't all used up -- if some of it is
301  // still available, then SLEEP until we reach the NEXT PULSE. (In
302  // practice, we will probably use TOO MUCH time, not too little--but
303  // then again OT isn't ALWAYS processing a message. There could be
304  // plenty of dead time in between...)
305  double endTick = t.getElapsedTimeInMilliSec();
306  int64_t elapsed = static_cast<int64_t>(endTick - startTick);
307 
309  int64_t ms = ServerSettings::GetHeartbeatMsBetweenBeats() - elapsed;
311  }
312 
313  if (server_->IsFlaggedForShutdown()) {
314  OTLog::Output(0, "opentxs server is shutting down gracefully.\n");
315  break;
316  }
317  }
318 }
static EXPORT void vError(const char *szError,...)
Definition: OTLog.cpp:800
static EXPORT void Output(int32_t nVerbosity, const char *szOutput)
Definition: OTLog.cpp:710
Definition: Timer.hpp:31
EXPORT double getElapsedTimeInMilliSec()
Definition: Timer.cpp:111
bool IsFlaggedForShutdown() const
Definition: OTServer.cpp:350
EXPORT void start()
Definition: Timer.cpp:41
static int32_t GetHeartbeatNoRequests()
EXPORT bool Send(const OTASCIIArmor &ascEnvelope)
static EXPORT void Error(const char *szError)
Definition: OTLog.cpp:831
static int32_t GetHeartbeatMsBetweenBeats()
static EXPORT bool SleepMilliseconds(int64_t lMilliseconds)
Definition: OTLog.cpp:651
EXPORT bool Receive(OTString &strServerReply)
static EXPORT void vOutput(int32_t nVerbosity, const char *szOutput,...)
Definition: OTLog.cpp:768

The documentation for this class was generated from the following files: