diff options
author | Vladimir Mezentsev <vladimir.mezentsev@oracle.com> | 2022-03-11 08:58:31 +0000 |
---|---|---|
committer | Nick Clifton <nickc@redhat.com> | 2022-03-11 08:58:31 +0000 |
commit | bb368aad297fe3ad40cf397e6fc85aa471429a28 (patch) | |
tree | 0ab25909b8fe789d676bbdb00d501d4d485e4afe /gprofng/src/ipcio.cc | |
parent | a655f19af95eb685ba64f48ee8fc2b3b7a3d886a (diff) | |
download | binutils-bb368aad297fe3ad40cf397e6fc85aa471429a28.zip binutils-bb368aad297fe3ad40cf397e6fc85aa471429a28.tar.gz binutils-bb368aad297fe3ad40cf397e6fc85aa471429a28.tar.bz2 |
gprofng: a new GNU profiler
top-level
* Makefile.def: Add gprofng module.
* configure.ac: Add --enable-gprofng option.
* src-release.sh: Add gprofng.
* Makefile.in: Regenerate.
* configure: Regenerate.
* gprofng: New directory.
binutils
* MAINTAINERS: Add gprofng maintainer.
* README-how-to-make-a-release: Add gprofng.
include.
* collectorAPI.h: New file.
* libcollector.h: New file.
* libfcollector.h: New file.
Diffstat (limited to 'gprofng/src/ipcio.cc')
-rw-r--r-- | gprofng/src/ipcio.cc | 1025 |
1 files changed, 1025 insertions, 0 deletions
diff --git a/gprofng/src/ipcio.cc b/gprofng/src/ipcio.cc new file mode 100644 index 0000000..57f2617 --- /dev/null +++ b/gprofng/src/ipcio.cc @@ -0,0 +1,1025 @@ +/* Copyright (C) 2021 Free Software Foundation, Inc. + Contributed by Oracle. + + This file is part of GNU Binutils. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3, or (at your option) + any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, 51 Franklin Street - Fifth Floor, Boston, + MA 02110-1301, USA. */ + +#include "config.h" +#include <stdio.h> +#include <stdlib.h> +#include <signal.h> +#include <unistd.h> +#include <iostream> +#include <iomanip> +#include <sstream> +#include <queue> +#include "vec.h" +#include "util.h" +#include "ipcio.h" +#include "DbeThread.h" +#include "Experiment.h" + +#define ipc_trace if (ipc_flags) ipc_default_log +#define ipc_request_trace if (ipc_flags) ipc_request_log +#define ipc_response_trace if (ipc_flags) ipc_response_log + +using namespace std; + +// IPC implementation +static const int L_PROGRESS = 0; +static const int L_INTEGER = 1; +static const int L_BOOLEAN = 2; +static const int L_LONG = 3; +static const int L_STRING = 4; +static const int L_DOUBLE = 5; +static const int L_ARRAY = 6; +static const int L_OBJECT = 7; +static const int L_CHAR = 8; + +int currentRequestID; +int currentChannelID; +static long maxSize; + +extern int cancellableChannelID; +extern int error_flag; +extern int ipc_delay_microsec; +extern FILE *responseLogFileP; + +IPCresponse *IPCresponseGlobal; + +BufferPool *responseBufferPool; + +IPCrequest::IPCrequest (int sz, int reqID, int chID) +{ + size = sz; + requestID = reqID; + channelID = chID; + status = INITIALIZED; + idx = 0; + buf = (char *) malloc (size); + cancelImmediate = false; +} + +IPCrequest::~IPCrequest () +{ + free (buf); +} + +void +IPCrequest::read (void) +{ + for (int i = 0; i < size; i++) + { + int c = getc (stdin); + ipc_request_trace (TRACE_LVL_4, " IPCrequest:getc(stdin): %02x\n", c); + buf[i] = c; + } +} + +IPCrequestStatus +IPCrequest::getStatus (void) +{ + return status; +} + +void +IPCrequest::setStatus (IPCrequestStatus newStatus) +{ + status = newStatus; +} + +static int +readByte (IPCrequest* req) +{ + int c; + int val = 0; + for (int i = 0; i < 2; i++) + { + if (req == NULL) + { + c = getc (stdin); + ipc_request_trace (TRACE_LVL_4, " readByte:getc(stdin): %02x\n", c); + } + else + c = req->rgetc (); + switch (c) + { + case '0': case '1': case '2': case '3': + case '4': case '5': case '6': case '7': + case '8': case '9': + val = val * 16 + c - '0'; + break; + case 'a': case 'b': case 'c': case 'd': case 'e': case 'f': + val = val * 16 + c - 'a' + 10; + break; + case EOF: + val = EOF; + break; + default: + fprintf (stderr, "readByte: Unknown byte: %d\n", c); + break; + } + } + return val; +} + +static int +readIVal (IPCrequest *req) +{ + int val = readByte (req); + for (int i = 0; i < 3; i++) + val = val * 256 + readByte (req); + ipc_trace (" readIVal: %d\n", val); + return val; +} + +static String +readSVal (IPCrequest *req) +{ + int len = readIVal (req); + if (len == -1) + { + ipc_trace (" readSVal: <NULL>\n"); + return NULL; + } + char *str = (char *) malloc (len + 1); + char *s = str; + *s = (char) 0; + while (len--) + *s++ = req->rgetc (); + *s = (char) 0; + ipc_trace (" readSVal: '%s'\n", str); + return str; +} + +static long long +readLVal (IPCrequest *req) +{ + long long val = readByte (req); + for (int i = 0; i < 7; i++) + val = val * 256 + readByte (req); + ipc_trace (" readLVal: %lld\n", val); + return val; +} + +static bool +readBVal (IPCrequest *req) +{ + int val = readByte (req); + ipc_trace (" readBVal: %s\n", val == 0 ? "true" : "false"); + return val != 0; +} + +static char +readCVal (IPCrequest *req) +{ + int val = readByte (req); + ipc_trace (" readCVal: %d\n", val); + return (char) val; +} + +static double +readDVal (IPCrequest *req) +{ + String s = readSVal (req); + double d = atof (s); + free (s); + return d; +} + +static Object +readAVal (IPCrequest *req) +{ + bool twoD = false; + int type = readByte (req); + if (type == L_ARRAY) + { + twoD = true; + type = readByte (req); + } + ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type); + + int len = readIVal (req); + if (len == -1) + return NULL; + switch (type) + { + case L_INTEGER: + if (twoD) + { + Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len); + for (int i = 0; i < len; i++) + array->store (i, (Vector<int>*)readAVal (req)); + return array; + } + else + { + Vector<int> *array = new Vector<int>(len); + for (int i = 0; i < len; i++) + array->store (i, readIVal (req)); + return array; + } + //break; + case L_LONG: + if (twoD) + { + Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len); + for (int i = 0; i < len; i++) + array->store (i, (Vector<long long>*)readAVal (req)); + return array; + } + else + { + Vector<long long> *array = new Vector<long long>(len); + for (int i = 0; i < len; i++) + array->store (i, readLVal (req)); + return array; + } + //break; + case L_DOUBLE: + if (twoD) + { + Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len); + for (int i = 0; i < len; i++) + array->store (i, (Vector<double>*)readAVal (req)); + return array; + } + else + { + Vector<double> *array = new Vector<double>(len); + for (int i = 0; i < len; i++) + array->store (i, readDVal (req)); + return array; + } + //break; + case L_BOOLEAN: + if (twoD) + { + Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len); + for (int i = 0; i < len; i++) + array->store (i, (Vector<bool>*)readAVal (req)); + return array; + } + else + { + Vector<bool> *array = new Vector<bool>(len); + for (int i = 0; i < len; i++) + array->store (i, readBVal (req)); + return array; + } + //break; + case L_CHAR: + if (twoD) + { + Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len); + for (int i = 0; i < len; i++) + array->store (i, (Vector<char>*)readAVal (req)); + return array; + } + else + { + Vector<char> *array = new Vector<char>(len); + for (int i = 0; i < len; i++) + array->store (i, readCVal (req)); + return array; + } + //break; + case L_STRING: + if (twoD) + { + Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len); + for (int i = 0; i < len; i++) + array->store (i, (Vector<String>*)readAVal (req)); + return array; + } + else + { + Vector<String> *array = new Vector<String>(len); + for (int i = 0; i < len; i++) + array->store (i, readSVal (req)); + return array; + } + //break; + case L_OBJECT: + if (twoD) + { + Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len); + for (int i = 0; i < len; i++) + array->store (i, (Vector<Object>*)readAVal (req)); + return array; + } + else + { + Vector<Object> *array = new Vector<Object>(len); + for (int i = 0; i < len; i++) + array->store (i, readAVal (req)); + return array; + } + //break; + default: + fprintf (stderr, "readAVal: Unknown code: %d\n", type); + break; + } + return NULL; +} + +static int iVal; +static bool bVal; +static long long lVal; +static String sVal; +static double dVal; +static Object aVal; + +static void +readResult (int type, IPCrequest *req) +{ + int tVal = readByte (req); + switch (tVal) + { + case L_INTEGER: + iVal = readIVal (req); + break; + case L_LONG: + lVal = readLVal (req); + break; + case L_BOOLEAN: + bVal = readBVal (req); + break; + case L_DOUBLE: + dVal = readDVal (req); + break; + case L_STRING: + sVal = readSVal (req); + break; + case L_ARRAY: + aVal = readAVal (req); + break; + case EOF: + fprintf (stderr, "EOF read in readResult\n"); + sVal = NULL; + return; + default: + fprintf (stderr, "Unknown code: %d\n", tVal); + abort (); + } + if (type != tVal) + { + fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type); + abort (); + } +} + +int +readInt (IPCrequest *req) +{ + readResult (L_INTEGER, req); + return iVal; +} + +String +readString (IPCrequest *req) +{ + readResult (L_STRING, req); + return sVal; +} + +long long +readLong (IPCrequest *req) +{ + readResult (L_LONG, req); + return lVal; +} + +double +readDouble (IPCrequest *req) +{ + readResult (L_DOUBLE, req); + return dVal; +} + +bool +readBoolean (IPCrequest *req) +{ + readResult (L_BOOLEAN, req); + return bVal; +} + +DbeObj +readObject (IPCrequest *req) +{ + readResult (L_LONG, req); + return (DbeObj) lVal; +} + +Object +readArray (IPCrequest *req) +{ + readResult (L_ARRAY, req); + return aVal; +} + +// Write +IPCresponse::IPCresponse (int sz) +{ + requestID = -1; + channelID = -1; + responseType = -1; + responseStatus = RESPONSE_STATUS_SUCCESS; + sb = new StringBuilder (sz); + next = NULL; +} + +IPCresponse::~IPCresponse () +{ + delete sb; +} + +void +IPCresponse::reset () +{ + requestID = -1; + channelID = -1; + responseType = -1; + responseStatus = RESPONSE_STATUS_SUCCESS; + sb->setLength (0); +} + +void +IPCresponse::sendByte (int b) +{ + ipc_trace ("sendByte: %02x %d\n", b, b); + sb->appendf ("%02x", b); +} + +void +IPCresponse::sendIVal (int i) +{ + ipc_trace ("sendIVal: %08x %d\n", i, i); + sb->appendf ("%08x", i); +} + +void +IPCresponse::sendLVal (long long l) +{ + ipc_trace ("sendLVal: %016llx %lld\n", l, l); + sb->appendf ("%016llx", l); +} + +void +IPCresponse::sendSVal (const char *s) +{ + if (s == NULL) + { + sendIVal (-1); + return; + } + sendIVal ((int) strlen (s)); + ipc_trace ("sendSVal: %s\n", s); + sb->appendf ("%s", s); +} + +void +IPCresponse::sendBVal (bool b) +{ + sendByte (b ? 1 : 0); +} + +void +IPCresponse::sendCVal (char c) +{ + sendByte (c); +} + +void +IPCresponse::sendDVal (double d) +{ + char str[32]; + snprintf (str, sizeof (str), "%.12f", d); + sendSVal (str); +} + +void +IPCresponse::sendAVal (void *ptr) +{ + if (ptr == NULL) + { + sendByte (L_INTEGER); + sendIVal (-1); + return; + } + + VecType type = ((Vector<void*>*)ptr)->type (); + switch (type) + { + case VEC_INTEGER: + { + sendByte (L_INTEGER); + Vector<int> *array = (Vector<int>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendIVal (array->fetch (i)); + break; + } + case VEC_BOOL: + { + sendByte (L_BOOLEAN); + Vector<bool> *array = (Vector<bool>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendBVal (array->fetch (i)); + break; + } + case VEC_CHAR: + { + sendByte (L_CHAR); + Vector<char> *array = (Vector<char>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendCVal (array->fetch (i)); + break; + } + case VEC_LLONG: + { + sendByte (L_LONG); + Vector<long long> *array = (Vector<long long>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendLVal (array->fetch (i)); + break; + } + case VEC_DOUBLE: + { + sendByte (L_DOUBLE); + Vector<double> *array = (Vector<double>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendDVal (array->fetch (i)); + break; + } + case VEC_STRING: + { + sendByte (L_STRING); + Vector<String> *array = (Vector<String>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendSVal (array->fetch (i)); + break; + } + case VEC_STRINGARR: + { + sendByte (L_ARRAY); + sendByte (L_STRING); + Vector<void*> *array = (Vector<void*>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendAVal (array->fetch (i)); + break; + } + case VEC_INTARR: + { + sendByte (L_ARRAY); + sendByte (L_INTEGER); + Vector<void*> *array = (Vector<void*>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendAVal (array->fetch (i)); + break; + } + case VEC_LLONGARR: + { + sendByte (L_ARRAY); + sendByte (L_LONG); + Vector<void*> *array = (Vector<void*>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendAVal (array->fetch (i)); + break; + } + case VEC_VOIDARR: + { + sendByte (L_OBJECT); + Vector<void*> *array = (Vector<void*>*)ptr; + sendIVal (array->size ()); + for (int i = 0; i < array->size (); i++) + sendAVal (array->fetch (i)); + break; + } + default: + fprintf (stderr, "sendAVal: Unknown type: %d\n", type); + abort (); + } +} + +static void +writeResponseHeader (int requestID, int responseType, int responseStatus, int nBytes) +{ + if (responseType == RESPONSE_TYPE_HANDSHAKE) + nBytes = IPC_VERSION_NUMBER; + int use_write = 2; + ipc_response_trace (TRACE_LVL_1, "ResponseHeaderBegin----- %x ---- %x ----- %x -----%x -------\n", requestID, responseType, responseStatus, nBytes); + if (use_write) + { + char buf[23]; + if (use_write == 1) + { + int i = 0; + snprintf (buf + i, 3, "%2x", HEADER_MARKER); + i += 2; + snprintf (buf + i, 9, "%8x", requestID); + i += 8; + snprintf (buf + i, 3, "%2x", responseType); + i += 2; + snprintf (buf + i, 3, "%2x", responseStatus); + i += 2; + snprintf (buf + i, 9, "%8x", nBytes); + } + else + snprintf (buf, 23, "%02x%08x%02x%02x%08x", HEADER_MARKER, requestID, + responseType, responseStatus, nBytes); + buf[22] = 0; + write (1, buf, 22); + } + else + { + cout << setfill ('0') << setw (2) << hex << HEADER_MARKER; + cout << setfill ('0') << setw (8) << hex << requestID; + cout << setfill ('0') << setw (2) << hex << responseType; + cout << setfill ('0') << setw (2) << hex << responseStatus; + cout << setfill ('0') << setw (8) << hex << nBytes; + cout.flush (); + } + ipc_response_trace (TRACE_LVL_1, "----------------------------ResponseHeaderEnd\n"); + if (nBytes > maxSize) + { + maxSize = nBytes; + ipc_trace ("New maxsize %ld\n", maxSize); + } +} + +bool +cancelNeeded (int chID) +{ + if (chID == cancellableChannelID && chID == cancelRequestedChannelID) + return true; + else + return false; +} + +static void +writeResponseWithHeader (int requestID, int channelID, int responseType, + int responseStatus, IPCresponse* os) +{ + if (cancelNeeded (channelID)) + { + responseStatus = RESPONSE_STATUS_CANCELLED; + ipc_trace ("CANCELLING %d %d\n", requestID, channelID); + // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here + } + os->setRequestID (requestID); + os->setChannelID (channelID); + os->setResponseType (responseType); + os->setResponseStatus (responseStatus); + os->print (); + os->reset (); + responseBufferPool->recycle (os); +} + +void +writeAckFast (int requestID) +{ + writeResponseHeader (requestID, RESPONSE_TYPE_ACK, RESPONSE_STATUS_SUCCESS, 0); +} + +void +writeAck (int requestID, int channelID) +{ +#if DEBUG + char *s = getenv (NTXT ("SP_NO_IPC_ACK")); +#else /* ^DEBUG */ + char *s = NULL; +#endif /* ^DEBUG */ + if (s) + { + int i = requestID; + int j = channelID; + ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j); + } + else + { + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL); + writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK, + RESPONSE_STATUS_SUCCESS, OUTS); + } +} + +void +writeHandshake (int requestID, int channelID) +{ + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL); + writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS); + // writeResponseHeader(requestID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, IPC_VERSION_NUMBER); +} + +void +writeResponseGeneric (int responseStatus, int requestID, int channelID) +{ + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL); + writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS); +} + +BufferPool::BufferPool () +{ + pthread_mutex_init (&p_mutex, NULL); + smallBuf = NULL; + largeBuf = NULL; +} + +BufferPool::~BufferPool () +{ + for (IPCresponse *p = smallBuf; p;) + { + IPCresponse *tmp = p; + p = tmp->next; + delete tmp; + } + for (IPCresponse *p = largeBuf; p;) + { + IPCresponse *tmp = p; + p = tmp->next; + delete tmp; + } +} + +IPCresponse* +BufferPool::getNewResponse (int size) +{ + pthread_mutex_lock (&p_mutex); + if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE) + size = BUFFER_SIZE_LARGE; + IPCresponse *newResponse = NULL; + if (size >= BUFFER_SIZE_LARGE) + { + if (largeBuf) + { + newResponse = largeBuf; + largeBuf = largeBuf->next; + } + } + else if (smallBuf) + { + newResponse = smallBuf; + smallBuf = smallBuf->next; + } + if (newResponse) + newResponse->reset (); + else + { + newResponse = new IPCresponse (size); + ipc_trace ("GETNEWBUFFER %d\n", size); + } + pthread_mutex_unlock (&p_mutex); + return newResponse; +} + +void +BufferPool::recycle (IPCresponse *respB) +{ + pthread_mutex_lock (&p_mutex); + if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE) + { + respB->next = largeBuf; + largeBuf = respB; + } + else + { + respB->next = smallBuf; + smallBuf = respB; + } + pthread_mutex_unlock (&p_mutex); +} + +void +writeArray (void *ptr, IPCrequest* req) +{ + if (req->getStatus () == CANCELLED_IMMEDIATE) + return; + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE); + OUTS->sendByte (L_ARRAY); + OUTS->sendAVal (ptr); + writeResponseWithHeader (req->getRequestID (), req->getChannelID (), + RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); +} + +void +writeString (const char *s, IPCrequest* req) +{ + if (req->getStatus () == CANCELLED_IMMEDIATE) + return; + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE); + OUTS->sendByte (L_STRING); + OUTS->sendSVal (s); + writeResponseWithHeader (req->getRequestID (), req->getChannelID (), + RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); +} + +void +writeObject (DbeObj obj, IPCrequest* req) +{ + writeLong ((long long) obj, req); +} + +void +writeBoolean (bool b, IPCrequest* req) +{ + if (req->getStatus () == CANCELLED_IMMEDIATE) + return; + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); + OUTS->sendByte (L_BOOLEAN); + OUTS->sendBVal (b); + writeResponseWithHeader (req->getRequestID (), req->getChannelID (), + RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); +} + +void +writeInt (int i, IPCrequest* req) +{ + if (req->getStatus () == CANCELLED_IMMEDIATE) + return; + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); + OUTS->sendByte (L_INTEGER); + OUTS->sendIVal (i); + writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); +} + +void +writeChar (char c, IPCrequest* req) +{ + if (req->getStatus () == CANCELLED_IMMEDIATE) + return; + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); + OUTS->sendByte (L_CHAR); + OUTS->sendCVal (c); + writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); +} + +void +writeLong (long long l, IPCrequest* req) +{ + if (req->getStatus () == CANCELLED_IMMEDIATE) + return; + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); + OUTS->sendByte (L_LONG); + OUTS->sendLVal (l); + writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); +} + +void +writeDouble (double d, IPCrequest* req) +{ + if (req->getStatus () == CANCELLED_IMMEDIATE) return; + IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM); + OUTS->sendByte (L_DOUBLE); + OUTS->sendDVal (d); + writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS); +} + +int +setProgress (int percentage, const char *proc_str) +{ + if (cancelNeeded (currentChannelID)) + { + // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException(); + // throw (e1); + return 1; + } + if (NULL == proc_str) + return 1; + int size = strlen (proc_str) + 100; // 100 bytes for additional data + int bs = BUFFER_SIZE_MEDIUM; + if (size > BUFFER_SIZE_MEDIUM) + { + if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen + bs = BUFFER_SIZE_LARGE; + } + IPCresponse *OUTS = responseBufferPool->getNewResponse (bs); + OUTS->sendByte (L_PROGRESS); + OUTS->sendIVal (percentage); + OUTS->sendSVal (proc_str); + writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS); + return 0; +} + +void +IPCresponse::print (void) +{ + if (ipc_delay_microsec) + usleep (ipc_delay_microsec); + int stringSize = sb->length (); + writeResponseHeader (requestID, responseType, responseStatus, stringSize); + if (stringSize > 0) + { + char *s = sb->toString (); + hrtime_t start_time = gethrtime (); + int use_write = 1; + if (use_write) + write (1, s, stringSize); // write(1, sb->toString(), stringSize); + else + { + cout << s; + cout.flush (); + } + hrtime_t end_time = gethrtime (); + unsigned long long time_stamp = end_time - start_time; + ipc_response_log (TRACE_LVL_3, "ReqID %x flush time %llu nanosec \n", requestID, time_stamp); + free (s); + } +} + +void +setCancelRequestedCh (int chID) +{ + cancelRequestedChannelID = chID; +} + +void +readRequestHeader () +{ + int marker = readByte (NULL); + if (marker != HEADER_MARKER) + { + fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker); + error_flag = 1; + return; + } + else + ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n"); + int requestID = readIVal (NULL); + int requestType = readByte (NULL); + int channelID = readIVal (NULL); + int nBytes = readIVal (NULL); + if (requestType == REQUEST_TYPE_HANDSHAKE) + { + // write the ack directly to the wire, not through the response queue + // writeAckFast(requestID); + writeAck (requestID, channelID); + maxSize = 0; + writeHandshake (requestID, channelID); + ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes); + } + else if (requestType == REQUEST_TYPE_CANCEL) + { + writeAck (requestID, channelID); + ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH: %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes); + if (channelID == cancellableChannelID) + { + // we have worked on at least one request belonging to this channel + writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID); + setCancelRequestedCh (channelID); + ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID); + if (channelID == currentChannelID) + // request for this channel is currently in progress + ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION"); + // ssp_post_cond(waitingToFinish); + } + else + { + // FIXME: + // it is possible that a request for this channel is on the requestQ + // or has been submitted to the work group queue but is waiting for a thread to pick it up + writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID); + setCancelRequestedCh (channelID); + ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID); + } + } + else + { + writeAck (requestID, channelID); + ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes); + IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID); + nreq->read (); + ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID); + if (cancelNeeded (channelID)) + { + ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID); + writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID); + delete nreq; + return; + } + DbeQueue *q = new DbeQueue (ipc_doWork, nreq); + ipcThreadPool->put_queue (q); + } +} |