Skip to content

Commit 6290bc8

Browse files
More cleanup for threading
1 parent 247741d commit 6290bc8

File tree

2 files changed

+60
-130
lines changed

2 files changed

+60
-130
lines changed

libs/ofxLibwebsockets/include/ofxLibwebsockets/Connection.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ namespace ofxLibwebsockets {
8282
//std::vector<unsigned char> buf;
8383

8484
// threading stuff
85-
vector<TextPacket> messages_text;
86-
vector<BinaryPacket> messages_binary;
85+
std::deque<TextPacket> messages_text;
86+
std::deque<BinaryPacket> messages_binary;
8787
};
8888

8989

libs/ofxLibwebsockets/src/Connection.cpp

Lines changed: 58 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -69,27 +69,11 @@ namespace ofxLibwebsockets {
6969
if ( message.size() == 0 ) return;
7070
int n = 0;
7171

72-
// size packet based on either bufferSize (max) or passed 'size' (whichever is smaller)
73-
int dataSize = bufferSize > message.size() ? message.size() : bufferSize;
74-
memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING], message.c_str(), dataSize );
75-
7672
// changed 3/6/15: buffer all messages to prevent threading errors
7773
TextPacket tp;
7874
tp.index = 0;
7975
tp.message = message;
8076
messages_text.push_back(tp);
81-
82-
// we have a nice small frame, just send it
83-
// } else {
84-
// if ( lws_partial_buffered(ws) == 0 ){
85-
// n = libwebsocket_write(ws, &buf[LWS_SEND_BUFFER_PRE_PADDING], message.size(), LWS_WRITE_TEXT);
86-
// } else {
87-
// n = -1;
88-
// }
89-
// }
90-
91-
if (n < 0)
92-
ofLogError() << "[ofxLibwebsockets] ERROR writing to socket" << std::endl;
9377
}
9478

9579
//--------------------------------------------------------------
@@ -105,10 +89,6 @@ namespace ofxLibwebsockets {
10589
//--------------------------------------------------------------
10690
void Connection::sendBinary( char * data, unsigned int size ){
10791
int n = -1;
108-
// size binary packet based on either bufferSize (max) or passed 'size' (whichever is smaller)
109-
int dataSize = bufferSize > size ? size : bufferSize;
110-
memcpy(&binaryBuf[LWS_SEND_BUFFER_PRE_PADDING], data, dataSize );
111-
11292

11393
// changed 3/6/15: buffer all messages to prevent threading errors
11494
// we have a big frame, so we need to send a few times
@@ -124,138 +104,88 @@ namespace ofxLibwebsockets {
124104
memcpy(bp.data, data, size);
125105

126106
messages_binary.push_back(bp);
127-
128-
n = 0;
129-
130-
// // we have a nice small frame, just send it
131-
// } else {
132-
// n = libwebsocket_write(ws, &binaryBuf[LWS_SEND_BUFFER_PRE_PADDING], dataSize, LWS_WRITE_BINARY);
133-
// }
134-
135-
if (n < 0){
136-
ofLogError() << "[ofxLibwebsockets] ERROR writing to socket" << std::endl;
137-
}
138107
}
139108

140109
//--------------------------------------------------------------
141110
void Connection::update(){
142111
// process standard ws messages
143112
if ( messages_text.size() > 0 && protocol->idle ){
113+
// grab first packet
144114
TextPacket & packet = messages_text[0];
145115

146-
if ( packet.index == 0 ){
147-
int dataSize = bufferSize > packet.message.size() ? packet.message.size() : bufferSize;
148-
memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING], packet.message.c_str(), dataSize );
116+
// either send a part of the message or just the message itself
117+
int dataSize = bufferSize > packet.message.size() ? packet.message.size() : bufferSize;
118+
119+
// if "start" set 'write text'; otherwise we're sending a continuation
120+
int writeMode = packet.index == 0 ? LWS_WRITE_TEXT : LWS_WRITE_CONTINUATION;
121+
122+
bool bDone = false;
123+
124+
// are we going to write the whole packet here?
125+
if ( packet.index + dataSize >= packet.message.size() ){
126+
dataSize = packet.message.size() - packet.index;
127+
bDone = true;
128+
} else {
129+
writeMode |= LWS_WRITE_NO_FIN; // add "we're not finished" flag
130+
}
131+
132+
// actual write to libwebsockets
133+
memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING], packet.message.c_str() + packet.index, dataSize );
134+
protocol->idle = false;
135+
136+
int n = libwebsocket_write(ws, &buf[LWS_SEND_BUFFER_PRE_PADDING], dataSize, (libwebsocket_write_protocol) writeMode );
137+
138+
if ( n < -1 ){
139+
ofLogError()<<"[ofxLibwebsockets] ERROR writing to socket";
140+
}
141+
142+
libwebsocket_callback_on_writable(context, ws);
143+
packet.index += dataSize;
144+
145+
// packet sent completed, erase front of dequeue
146+
if ( bDone ){
147+
messages_text.pop_front();
148+
}
149+
150+
} else if ( messages_text.size() > 0 && messages_text[0].index ){
151+
libwebsocket_callback_on_writable(context, ws);
152+
}
153+
154+
// process binary messages
155+
if ( messages_binary.size() > 0 && protocol->idle ){
156+
if ( messages_binary.size() > 0 ){
157+
BinaryPacket & packet = messages_binary[0];
158+
159+
int dataSize = bufferSize > packet.size ? packet.size : bufferSize;
160+
int writeMode = packet.index == 0 ? LWS_WRITE_BINARY : LWS_WRITE_CONTINUATION;
149161

150-
int writeMode = LWS_WRITE_TEXT;
151162
bool bDone = false;
152-
153-
if ( packet.index + dataSize >= packet.message.size() ){
154-
dataSize = packet.message.size() - packet.index;
163+
if ( packet.index + dataSize >= packet.size ){
164+
dataSize = packet.size - packet.index;
155165
bDone = true;
156166
} else {
157167
writeMode |= LWS_WRITE_NO_FIN; // add "we're not finished" flag
158168
}
159169

160-
protocol->idle = false;
161-
162-
int n = libwebsocket_write(ws, &buf[LWS_SEND_BUFFER_PRE_PADDING], dataSize, (libwebsocket_write_protocol) writeMode );
163-
164-
if ( n < -1 ){
165-
ofLogError()<<"[ofxLibwebsockets] ERROR writing to socket";
166-
}
167-
168-
libwebsocket_callback_on_writable(context, ws);
169-
packet.index = dataSize;
170-
171-
if ( bDone ){
172-
messages_text.erase(messages_text.begin());
173-
}
174-
175-
} else {
176-
// continue to send large message in chunks
177-
int dataSize = bufferSize > packet.message.size() ? packet.message.size() : bufferSize;
178-
int writeMode = LWS_WRITE_CONTINUATION;
179-
writeMode |= LWS_WRITE_NO_FIN;
170+
memcpy(&binaryBuf[LWS_SEND_BUFFER_PRE_PADDING], packet.data + packet.index, dataSize );
180171

181-
bool bDone = false;
182-
if ( packet.index + dataSize >= packet.message.size() ){
183-
dataSize = packet.message.size() - packet.index;
184-
writeMode = LWS_WRITE_CONTINUATION;
185-
bDone = true;
186-
}
172+
// this sets the protocol to wait until "idle"
173+
protocol->idle = false; // todo: this should be automatic on write!
187174

188-
memcpy(&buf[LWS_SEND_BUFFER_PRE_PADDING], packet.message.c_str() + packet.index, dataSize );
189-
protocol->idle = false;
190-
int n = libwebsocket_write(ws, &buf[LWS_SEND_BUFFER_PRE_PADDING], dataSize, (libwebsocket_write_protocol) writeMode );
175+
int n = libwebsocket_write(ws, &binaryBuf[LWS_SEND_BUFFER_PRE_PADDING], dataSize, (libwebsocket_write_protocol) writeMode );
191176
libwebsocket_callback_on_writable(context, ws);
192177
packet.index += dataSize;
193178

194-
if ( bDone ){
195-
messages_text.erase(messages_text.begin());
179+
if ( n < -1 ){
180+
ofLogError()<<"[ofxLibwebsockets] ERROR writing to socket";
196181
}
197-
}
198-
} else if ( messages_text.size() > 0 && messages_text[0].index ){
199-
libwebsocket_callback_on_writable(context, ws);
200-
}
201-
202-
// process binary messages
203-
if ( messages_binary.size() > 0 && protocol->idle ){
204-
if ( messages_binary.size() > 0 ){
205-
BinaryPacket & packet = messages_binary[0];
206182

207-
if ( packet.index == 0 ){
208-
// write beginning of packet
209-
int dataSize = bufferSize > packet.size ? packet.size : bufferSize;
210-
memcpy(&binaryBuf[LWS_SEND_BUFFER_PRE_PADDING], packet.data, dataSize );
211-
212-
int writeMode = LWS_WRITE_BINARY;
213-
214-
bool bDone = false;
215-
if ( packet.index + dataSize >= packet.size ){
216-
bDone = true;
217-
} else {
218-
writeMode |= LWS_WRITE_NO_FIN;
219-
}
220-
221-
protocol->idle = false;
222-
int n = libwebsocket_write(ws, &binaryBuf[LWS_SEND_BUFFER_PRE_PADDING], dataSize, (libwebsocket_write_protocol) writeMode );
223-
libwebsocket_callback_on_writable(context, ws);
224-
packet.index += dataSize;
225-
226-
if ( bDone ){
227-
free(packet.data);
228-
messages_binary.erase(messages_binary.begin());
229-
}
230-
231-
} else {
232-
// continue to send large message in chunks
233-
int dataSize = bufferSize > packet.size ? packet.size : bufferSize;
234-
int writeMode = LWS_WRITE_CONTINUATION;
235-
writeMode |= LWS_WRITE_NO_FIN;
236-
237-
bool bDone = false;
238-
if ( packet.index + dataSize >= packet.size ){
239-
dataSize = packet.size - packet.index;
240-
writeMode = LWS_WRITE_CONTINUATION;
241-
bDone = true;
242-
}
243-
244-
memcpy(&binaryBuf[LWS_SEND_BUFFER_PRE_PADDING], packet.data + packet.index, dataSize );
245-
246-
// this sets the protocol to wait until "idle"
247-
protocol->idle = false; // todo: this should be automatic on write!
248-
int n = libwebsocket_write(ws, &binaryBuf[LWS_SEND_BUFFER_PRE_PADDING], dataSize, (libwebsocket_write_protocol) writeMode );
249-
libwebsocket_callback_on_writable(context, ws);
250-
packet.index += dataSize;
251-
252-
if ( bDone ){
253-
free(packet.data);
254-
messages_binary.erase(messages_binary.begin());
255-
}
183+
if ( bDone ){
184+
free(packet.data);
185+
messages_binary.pop_front();
256186
}
257187
}
258-
} else if ( messages_text.size() > 0 && messages_text[0].index ){
188+
} else if ( messages_binary.size() > 0 && messages_binary[0].index ){
259189
libwebsocket_callback_on_writable(context, ws);
260190
}
261191
}

0 commit comments

Comments
 (0)