1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
/* Copyright (C) 2021-2023 Free Software Foundation, Inc.
Contributed by Oracle.
This file is part of GNU Binutils.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, 51 Franklin Street - Fifth Floor, Boston,
MA 02110-1301, USA. */
/* Defines the external interface between er_ipc and the routines */
#ifndef _IPCIO_H
#define _IPCIO_H
#include <pthread.h>
#include "gp-defs.h"
#include "StringBuilder.h"
class DbeThreadPool;
typedef long long DbeObj;
typedef void *Object;
typedef char *String;
#define BUFFER_SIZE_SMALL 512
#define BUFFER_SIZE_MEDIUM 512
#define BUFFER_SIZE_LARGE 1024*1024
#define REQUEST_HAS_NO_BODY 0xFFFFFFFF
#define RESPONSE_STATUS_DEFAULT 0
#define RESPONSE_STATUS_SUCCESS 1
#define RESPONSE_STATUS_FAILURE 2
#define RESPONSE_STATUS_CANCELLED 3
#define RESPONSE_TYPE_ACK 0
#define RESPONSE_TYPE_PROGRESS 1
#define RESPONSE_TYPE_COMPLETE 2
#define RESPONSE_TYPE_HANDSHAKE 3
#define HEADER_MARKER 0xff
#define REQUEST_TYPE_DEFAULT 0
#define REQUEST_TYPE_CANCEL 1
#define REQUEST_TYPE_HANDSHAKE 2
#define IPC_PROTOCOL_STR "IPC_PROTOCOL_38"
#define IPC_VERSION_NUMBER 38
enum IPCrequestStatus
{
INITIALIZED = 0,
IN_PROGRESS,
COMPLETED,
CANCELLED_DEFAULT,
CANCELLED_IMMEDIATE
};
enum IPCTraceLevel
{
TRACE_LVL_0 = 0,
TRACE_LVL_1,
TRACE_LVL_2,
TRACE_LVL_3,
TRACE_LVL_4
};
class IPCrequest
{
char *buf;
int size;
int idx;
int requestID;
int channelID;
IPCrequestStatus status;
bool cancelImmediate;
public:
IPCrequest (int, int, int);
~IPCrequest ();
IPCrequestStatus getStatus ();
void setStatus (IPCrequestStatus);
void read ();
int getRequestID () { return requestID; }
int getChannelID () { return channelID; }
bool isCancelImmediate () { return cancelImmediate; }
void setCancelImmediate () { cancelImmediate = true; }
char rgetc () { return buf[idx++]; }
};
class IPCresponse
{
public:
IPCresponse (int sz);
~IPCresponse ();
int getRequestID () { return requestID; }
int getChannelID () { return channelID; }
void setRequestID (int r) { requestID = r; }
void setChannelID (int c) { channelID = c; }
void setResponseType (int r) { responseType = r; }
void setResponseStatus (int s) { responseStatus = s; }
int getCurBufSize () { return sb->capacity (); }
void sendByte (int);
void sendIVal (int);
void sendLVal (long long);
void sendDVal (double);
void sendSVal (const char *);
void sendBVal (bool);
void sendCVal (char);
void sendAVal (void*);
void print (void);
void reset ();
IPCresponse *next;
private:
int requestID;
int channelID;
int responseType;
int responseStatus;
StringBuilder *sb;
};
class BufferPool
{
public:
BufferPool ();
~BufferPool ();
IPCresponse* getNewResponse (int);
void recycle (IPCresponse *);
private:
pthread_mutex_t p_mutex;
IPCresponse *smallBuf;
IPCresponse *largeBuf;
};
// Read from the wire
int readInt (IPCrequest*);
bool readBoolean (IPCrequest*);
long long readLong (IPCrequest*);
DbeObj readObject (IPCrequest*);
Object readArray (IPCrequest*);
String readString (IPCrequest*);
void readRequestHeader ();
// write to the wire
void writeString (const char *, IPCrequest*);
void writeBoolean (bool, IPCrequest*);
void writeInt (int, IPCrequest*);
void writeChar (char, IPCrequest*);
void writeLong (long long, IPCrequest*);
void writeDouble (double, IPCrequest*);
void writeArray (void *, IPCrequest*);
void writeObject (DbeObj, IPCrequest*);
void writeResponseGeneric (int, int, int);
int setProgress (int, const char *); // Update the progress bar
int ipc_doWork (void *); // The argument is an IPCrequest
extern int ipc_flags;
extern int ipc_single_threaded_mode;
extern DbeThreadPool *responseThreadPool;
extern DbeThreadPool *ipcThreadPool;
extern int cancelRequestedChannelID;
void ipc_default_log (const char *fmt, ...) __attribute__ ((format (printf, 1, 2)));
void ipc_response_log (IPCTraceLevel, const char *fmt, ...) __attribute__ ((format (printf, 2, 3)));
void ipc_request_log (IPCTraceLevel, const char *fmt, ...) __attribute__ ((format (printf, 2, 3)));
#endif
|