Branch data Line data Source code
1 : :
2 : : #include "gwrl/proactor.h"
3 : :
4 : : #ifdef __cplusplus
5 : : extern "C" {
6 : : #endif
7 : :
8 : : void
9 : 0 : io_activity(gwpr * pr, gwpr_io_info * info) {
10 : 0 : }
11 : :
12 : : void
13 : 201 : gwpr_src_activity(gwrl * rl, gwrlevt * evt) {
14 : 201 : if(evt->src->type == GWRL_SRC_TYPE_FILE) {
15 : 201 : char errfnc[16];
16 : 201 : gwpr * pr = _gwpr(rl->pr);
17 : 201 : gwrlsrc * src = evt->src;
18 : 201 : gwrlsrc_file * fsrc = _gwrlsrcf(evt->src);
19 : 201 : gwprdata * pdata = fsrc->pdata;
20 : 201 : gwpr_error_info errinfo = {0};
21 : 201 : gwpr_io_info ioinfo = {0};
22 : 201 : bzero(errfnc,sizeof(errfnc));
23 : :
24 [ + + ]: 201 : if(evt->flags & GWRL_RD) {
25 : 100 : int i = 0;
26 : 100 : ssize_t res = 0;
27 : 100 : gwprbuf * rd = NULL;
28 : :
29 [ - + ]: 100 : if(pdata->acceptcb) {
30 : : //an accept callback is set, assume they want us to
31 : : //try and accept as many clients up to gwpr_max_accept.
32 : 0 : ioinfo.src = fsrc;
33 [ # # ]: 0 : for(; i<pr->options.gwpr_max_accept; i++) {
34 : 0 : res = accept(evt->fd,_sockaddr(&(ioinfo.peer)),&(ioinfo.peerlen));
35 [ # # ]: 0 : if(res > -1) {
36 : 0 : ioinfo.peersrc = _gwrlsrcf(gwrl_src_file_create(res,0,NULL,NULL));
37 : 0 : pdata->acceptcb(pr,&ioinfo);
38 [ # # ]: 0 : } else if(res < 0) {
39 [ # # ]: 0 : if(errno != EWOULDBLOCK) {
40 [ # # ]: 0 : if(pdata->errorcb) {
41 : 0 : errinfo.src = fsrc;
42 : 0 : errinfo.errnm = errno;
43 : 0 : memcpy(errinfo.fnc,"accept\0",7);
44 : 0 : pdata->errorcb(pr,&errinfo);
45 : 0 : } else {
46 : 0 : gwprintsyserr("(09F6R) accept() error occured with no error callback.",errno);
47 : : }
48 : 0 : }
49 : 0 : }
50 : 0 : }
51 : 0 : }
52 : :
53 : : //clear info objects from past use
54 : 100 : bzero(&ioinfo,sizeof(ioinfo));
55 : 100 : bzero(&errinfo,sizeof(errinfo));
56 : :
57 [ + - ]: 100 : if(pdata->didreadcb) {
58 : : //did read callback is set so let's try and read data.
59 : :
60 : : //get or create a read buffer.
61 : 100 : rd = pdata->rdbuf;
62 [ + + ]: 199 : while(!rd) rd = gwpr_buf_get(pr,pdata->rdbufsize);
63 : 100 : pdata->rdbuf = NULL;
64 : 100 : ioinfo.src = fsrc;
65 : 100 : ioinfo.buf = rd;
66 : :
67 [ + - ]: 100 : if(pdata->rdop == gwpr_read_op_id) {
68 : 100 : ioinfo.op = gwpr_read_op_id;
69 : 100 : memcpy(errfnc,"read\0",5);
70 [ - + ][ - + ]: 100 : while((res=read(fsrc->fd,rd->buf,rd->bufsize)) < 0 && errno==EINTR);
71 : 100 : }
72 : :
73 [ # # ]: 0 : else if(pdata->rdop == gwpr_recvfrom_op_id) {
74 : 0 : ioinfo.op = gwpr_recvfrom_op_id;
75 : 0 : memcpy(errfnc,"recvfrom\0",9);
76 : 0 : ioinfo.peerlen = sizeof(ioinfo.peer);
77 [ # # ]: 0 : while((res=recvfrom(fsrc->fd,rd->buf,rd->bufsize,0,
78 [ # # ]: 0 : _sockaddr(&ioinfo.peer),&ioinfo.peerlen)) < 0 && errno == EINTR);
79 : 0 : }
80 : :
81 [ # # ]: 0 : else if(pdata->rdop == gwpr_recv_op_id) {
82 : 0 : ioinfo.op = gwpr_recv_op_id;
83 : 0 : memcpy(errfnc,"recv\0",5);
84 [ # # ][ # # ]: 0 : while((res=recv(fsrc->fd,rd->buf,rd->bufsize,0)) < 0 && errno == EINTR);
85 : 0 : }
86 : :
87 [ # # ]: 0 : else if(pdata->rdop == gwpr_recvmsg_op_id) {
88 : 0 : ioinfo.op = gwpr_recvmsg_op_id;
89 : 0 : memcpy(errfnc,"recvmsg\0",8);
90 [ # # ][ # # ]: 0 : while((res=recvmsg(fsrc->fd,(struct msghdr *)rd->buf,0)) < 0 && errno == EINTR);
91 : 0 : }
92 : :
93 [ + - ]: 100 : if(res > 0) {
94 : : //good read, call rd filters and did_read callback.
95 : :
96 : 100 : rd->len = res;
97 [ + - ][ - + ]: 100 : if(pdata->rdfilters && pdata->rdfilters[0] != NULL) {
98 [ # # ]: 0 : for(i=0; i<GWPR_FILTERS_MAX; i++) {
99 [ # # ]: 0 : if(!pdata->rdfilters[i]) break;
100 : 0 : pdata->rdfilters[i](pr,&ioinfo);
101 : 0 : }
102 : 0 : }
103 : :
104 : 100 : pdata->didreadcb(pr,&ioinfo);
105 : 100 : }
106 : :
107 [ # # ][ # # ]: 0 : else if(res == 0 && pdata->closedcb) {
108 : : //closed socket or eof from file
109 : 0 : pdata->closedcb(pr,&ioinfo);
110 : 0 : }
111 : :
112 [ # # ][ # # ]: 0 : else if(res < 0 && (errno == ECONNRESET || errno == EPIPE)
[ # # ][ # # ]
113 : : && pdata->closedcb) {
114 : : //closed socket or pipe error
115 : 0 : pdata->closedcb(pr,&ioinfo);
116 : 0 : }
117 : :
118 [ # # ][ # # ]: 0 : else if(res < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
[ # # ]
119 : : //socket or file marked as non blocking,
120 : : //no data to read, nothing to do.
121 : 0 : }
122 : :
123 [ # # ][ # # ]: 0 : else if(res < 0 && pdata->errorcb) {
124 : : //error we don't want to handle, pass to users.
125 : 0 : errinfo.errnm = errno;
126 : 0 : errinfo.src = fsrc;
127 : 0 : errinfo.op = ioinfo.op;
128 : 0 : memcpy(errinfo.fnc,errfnc,sizeof(errfnc));
129 : 0 : pdata->errorcb(pr,&errinfo);
130 : 0 : }
131 : :
132 : : else {
133 : 0 : gwprintsyserr("(OKL4F) proactor read error with no error callback",errno);
134 : : }
135 [ # # ][ # # ]: 100 : } else if(!pdata->didreadcb && !pdata->acceptcb) {
136 : 0 : gwerr("(4FL9KF) proactor can read data but there's no read callback set.");
137 : 0 : }
138 : 100 : }
139 : :
140 [ - + ]: 201 : if(evt->flags & GWRL_WR) {
141 : : //writable activity
142 : :
143 [ # # ]: 0 : if(pdata->connectcb) {
144 : : //the user wanted to know when a socket is writable after
145 : : //calling connect(), call it here and disable the connectcb.
146 : 0 : bzero(&ioinfo,sizeof(ioinfo));
147 : 0 : bzero(&errinfo,sizeof(errinfo));
148 : 0 : ioinfo.src = fsrc;
149 : 0 : ioinfo.peerlen = sizeof(ioinfo.peer);
150 : 0 : getpeername(fsrc->fd,_sockaddr(&ioinfo.peer),&ioinfo.peerlen);
151 : 0 : pdata->connectcb(pr,&ioinfo);
152 : 0 : pdata->connectcb = NULL;
153 : 0 : }
154 : :
155 [ # # ]: 0 : if(pdata->didwritecb) {
156 : : //write callback is set, let's try and write some data.
157 : 0 : int errnm = 0;
158 : 0 : bool stopwrite = false;
159 : 0 : bool freebuf = false;
160 : 0 : size_t written = 0;
161 : 0 : gwprwrq * q = pdata->wrq;
162 : 0 : gwprwrq * qn = NULL;
163 : 0 : gwprbuf * rembuf = NULL;
164 : 0 : pdata->wrq = NULL;
165 [ # # ]: 0 : if(pdata->didwritecb == &io_activity) freebuf = true;
166 : 0 : bzero(&ioinfo,sizeof(ioinfo));
167 : 0 : bzero(&errinfo,sizeof(errinfo));
168 : 0 : ioinfo.src = fsrc;
169 : :
170 [ # # ]: 0 : while(q) {
171 : : //loop over the gwprwrq objects associated with the file
172 : : //input source - this is all the data queued for writing.
173 : :
174 : 0 : qn = q->next;
175 : 0 : ioinfo.op = q->wrop;
176 : 0 : ioinfo.buf = q->buf;
177 [ # # ]: 0 : if(q->wrop == gwpr_sendto_op_id) {
178 : 0 : ioinfo.peerlen = q->peerlen;
179 : 0 : memcpy(&ioinfo.peer,&q->peer,q->peerlen);
180 : 0 : } else {
181 : 0 : ioinfo.peerlen = 0;
182 : 0 : bzero(&ioinfo.peer,sizeof(ioinfo.peer));
183 : : }
184 : :
185 [ # # ][ # # ]: 0 : if(pdata->wrfilters && pdata->wrfilters[0] != NULL) {
186 : : //call write filters before we send the data.
187 : 0 : int i = 0;
188 [ # # ]: 0 : for(; i< GWPR_FILTERS_MAX; i++) {
189 [ # # ]: 0 : if(!pdata->wrfilters[i]) break;
190 : 0 : pdata->wrfilters[i](pr,&ioinfo);
191 : 0 : }
192 : 0 : }
193 : :
194 : : //update the error fnc in case we need it.
195 [ # # ]: 0 : if(q->wrop == gwpr_write_op_id) memcpy(errfnc,"write\0",6);
196 [ # # ]: 0 : else if(q->wrop == gwpr_sendto_op_id) memcpy(errfnc,"sendto\0",7);
197 [ # # ]: 0 : else if(q->wrop == gwpr_send_op_id) memcpy(errfnc,"send\0",5);
198 [ # # ]: 0 : else if(q->wrop == gwpr_sendmsg_op_id) memcpy(errfnc,"sendmsg\0",8);
199 : :
200 : : //perform the write.
201 : 0 : gwpr_write_buffer(pr,fsrc,q->buf,q->wrop,&q->peer,q->peerlen,&written,&errnm);
202 : :
203 [ # # ]: 0 : if(written == q->buf->len) {
204 : : //successfully wrote all data.
205 : 0 : pdata->didwritecb(pr,&ioinfo);
206 : 0 : }
207 : :
208 [ # # ]: 0 : else if(written < q->buf->len) {
209 : : //partial write. the unwritten data is copied to a new
210 : : //buffer to put back in the write queue. notify the user
211 : : //of the partial write - only the written data is reported.
212 : 0 : size_t remain = q->buf->len - written;
213 : 0 : rembuf = gwpr_buf_get(pr,remain);
214 [ # # ]: 0 : while(!rembuf) rembuf = gwpr_buf_get(pr,remain);
215 [ # # ]: 0 : memcpy(rembuf->buf,q->buf->buf+written,remain);
216 : 0 : q->buf->len = written;
217 : 0 : pdata->didwritecb(pr,&ioinfo);
218 : :
219 [ # # ][ # # ]: 0 : if(errnm == 0 || errnm == EWOULDBLOCK || errnm == EAGAIN) {
[ # # ]
220 : : //all is ok, but since it's a partial write the
221 : : //queue has to be put back for writing again later
222 : 0 : gwprwrq * rembufq = gwprwrq_get(pr,fsrc);
223 [ # # ]: 0 : memcpy(&rembufq->peer,&q->peer,sizeof(rembufq->peer));
224 : 0 : rembufq->peerlen = q->peerlen;
225 : 0 : rembufq->buf = rembuf;
226 : 0 : rembufq->wrop = q->wrop;
227 : 0 : rembufq->next = q->next;
228 : 0 : gwprwrq_putback(pr,fsrc,rembufq);
229 : 0 : break;
230 : : }
231 : 0 : }
232 : :
233 [ # # ][ # # ]: 0 : if(errnm == ECONNRESET || errnm == EPIPE) {
234 : : //closed connection or pipe write with no read side connected.
235 : : //buf is nulled out because we should only give the user data
236 : : //that had been unwritten in the case of errors or closed fd.
237 [ # # ]: 0 : if(written > 0) ioinfo.buf = rembuf;
238 : 0 : else ioinfo.buf = NULL;
239 [ # # ]: 0 : if(pdata->closedcb) pdata->closedcb(pr,&ioinfo);
240 : 0 : break;
241 : : }
242 : :
243 [ # # ][ # # ]: 0 : else if(written == 0 && (errnm == EWOULDBLOCK || errnm == EAGAIN)) {
[ # # ]
244 : : //socket or file is non blocking and this would block.
245 : : //the socket or file is not-writable so restore the write
246 : : //queue starting from this q object for writing later.
247 : 0 : gwprwrq_putback(pr,fsrc,q);
248 : 0 : break;
249 : : }
250 : :
251 [ # # ][ # # ]: 0 : else if(errnm != 0 && pdata->errorcb) {
252 : : //error we don't want to handle. pass to the user.
253 : 0 : errinfo.errnm = errno;
254 : 0 : errinfo.src = fsrc;
255 : 0 : errinfo.op = q->wrop;
256 : 0 : errinfo.buf = q->buf;
257 [ # # ]: 0 : if(written > 0) errinfo.buf = rembuf;
258 : 0 : memcpy(errinfo.fnc,errfnc,sizeof(errfnc));
259 : 0 : pdata->errorcb(pr,&errinfo);
260 : 0 : stopwrite = true;
261 : 0 : gwprwrq_putback(pr,fsrc,q);
262 : 0 : break;
263 : : }
264 : :
265 [ # # ]: 0 : if(freebuf) {
266 : : //there was no user provided did_write callback set, so
267 : : //the buffer that's associated with a write queue item will
268 : : //get lost unless freed here.
269 : 0 : gwpr_buf_free(pr,q->buf);
270 : 0 : }
271 : :
272 : : //give back the gwprwrq
273 : 0 : gwprwrq_free(pr,fsrc,q);
274 : 0 : q = qn;
275 : 0 : }
276 : :
277 [ # # ][ # # ]: 0 : if(!pdata->wrq || stopwrite) {
278 : : //stop write events since there's nothing in the queue
279 : 0 : gwrlsrc_flags_t flags = src->flags;
280 : 0 : flclr(flags,GWRL_WR);
281 : 0 : gwrl_src_file_update_flags(pr->rl,src,flags);
282 : 0 : }
283 : 0 : }
284 : 0 : }
285 : :
286 [ + + ]: 201 : else if(evt->flags & GWRL_SYNC_WRITE) {
287 [ + - ]: 101 : if(pdata->didwritecb) {
288 : : //a synchronous write completed, notify the user here.
289 : 101 : gwprwrq * q = (gwprwrq *)evt->userdata;
290 : 101 : ioinfo.src = fsrc;
291 : 101 : ioinfo.buf = q->buf;
292 : 101 : ioinfo.op = q->wrop;
293 [ + - ]: 101 : if(q->wrop == gwpr_write_op_id) {
294 : 101 : memcpy(&ioinfo.peer,&q->peer,sizeof(ioinfo.peer));
295 : 101 : ioinfo.peerlen = q->peerlen;
296 : 101 : } else {
297 : 0 : bzero(&ioinfo.peer,sizeof(ioinfo.peer));
298 : 0 : ioinfo.peerlen = 0;
299 : : }
300 : 101 : pdata->didwritecb(pr,&ioinfo);
301 [ - + ]: 101 : if(pdata->didwritecb == &io_activity) gwpr_buf_free(pr,ioinfo.buf);
302 : 101 : gwprwrq_free(pr,fsrc,q);
303 : 101 : } else {
304 : 0 : gwerr("(38FG7) proactor synchronous write completed with no did_write callback set.");
305 : : }
306 : 201 : }
307 : :
308 [ - + ]: 201 : if(evt->flags & GWRL_SYNC_CLOSE) {
309 [ # # ]: 0 : if(pdata->closedcb) {
310 : : //a synchronous write discovered socket or file close.
311 : 0 : bzero(&ioinfo,sizeof(ioinfo));
312 : 0 : ioinfo.src = fsrc;
313 : 0 : pdata->closedcb(pr,&ioinfo);
314 : 0 : } else {
315 : 0 : gwerr("(7GK9F) proactor detected a closed socket or file with no closed callback set.");
316 : : }
317 : 0 : }
318 : :
319 [ - + ]: 201 : else if(evt->flags & GWRL_SYNC_ERROR) {
320 [ # # ]: 0 : if(pdata->errorcb) {
321 : : //a synchronous write had an error.
322 : 0 : gwpr_error_info * errinfo = (gwpr_error_info *)evt->userdata;
323 : 0 : pdata->errorcb(pr,errinfo);
324 : 0 : free(errinfo);
325 : 0 : } else {
326 : 0 : gwerr("(0LOP4) proactor encountered a synchronous write error with no error callback set.");
327 : : }
328 : 201 : }
329 : 201 : }
330 : 201 : }
331 : :
332 : : void
333 : 1 : gwpr_free(gwpr * pr) {
334 : 1 : int i = 0;
335 : 1 : gwrlsrc * src = NULL;
336 : 1 : gwrlsrc_file * fsrc = NULL;
337 [ + + ]: 4 : for(; i<GWRL_SRC_TYPES_COUNT; i++) {
338 : 3 : src = pr->rl->sources[i];
339 [ + + ]: 5 : while(src) {
340 [ + - ]: 2 : if(src->type == GWRL_SRC_TYPE_FILE) {
341 : 2 : gwrl_src_disable(pr->rl,src);
342 : 2 : fsrc = _gwrlsrcf(src);
343 : 2 : src->callback = NULL;
344 [ + + ]: 2 : if(fsrc->pdata) {
345 : 1 : gwprdata * pdata = fsrc->pdata;
346 [ + - ]: 1 : if(pdata->rdfilters) {
347 : 1 : free(pdata->rdfilters);
348 : 1 : pdata->rdfilters = NULL;
349 : 1 : }
350 [ + - ]: 1 : if(pdata->wrfilters) {
351 : 1 : free(pdata->wrfilters);
352 : 1 : pdata->wrfilters = NULL;
353 : 1 : }
354 [ + - ]: 1 : if(pdata->rdbuf) {
355 : 1 : gwpr_buf_free(pr,pdata->rdbuf);
356 : 1 : pdata->rdbuf = NULL;
357 : 1 : }
358 [ - + ]: 1 : if(pdata->wrq) {
359 : 0 : gwprwrq_free_list_no_cache(pr,pdata->wrq);
360 : 0 : pdata->wrq = NULL;
361 : 0 : pdata->wrqlast = NULL;
362 : 0 : }
363 : 1 : free(fsrc->pdata);
364 : 1 : fsrc->pdata = NULL;
365 : 1 : }
366 : 2 : }
367 : 2 : src = src->next;
368 : 2 : }
369 : 3 : }
370 [ - + ]: 1 : if(pr->wrqcache) {
371 : 0 : gwprwrq_free_list_no_cache(pr,pr->wrqcache);
372 : 0 : }
373 [ + - ]: 1 : if(pr->bufctl) {
374 : 1 : free(pr->bufctl);
375 : 1 : }
376 : 1 : pr->rl->pr = NULL;
377 : 1 : pr->rl = NULL;
378 : 1 : free(pr);
379 : 1 : }
380 : :
381 : : void
382 : 101 : gwpr_write_buffer(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf,
383 : 101 : gwpr_io_op_id op, struct sockaddr_storage * peer, socklen_t peerlen,
384 : 101 : size_t * written, int * errnm) {
385 : 101 : ssize_t didwrite = 0;
386 : 101 : size_t towrite = buf->len;
387 : 101 : char * _buf = buf->buf;
388 : :
389 [ + + ]: 202 : while(towrite > 0) {
390 : 101 : *errnm = 0;
391 : :
392 [ + - ]: 101 : if(op == gwpr_write_op_id) {
393 [ + - ][ - + ]: 101 : while((didwrite=write(fsrc->fd,_buf,towrite)) && didwrite < 0 && (errno == EINTR));
[ - + ]
394 [ # # ]: 101 : } else if(op == gwpr_send_op_id) {
395 [ # # ][ # # ]: 0 : while((didwrite=send(fsrc->fd,_buf,towrite,0)) && didwrite < 0 && (errno == EINTR));
[ # # ]
396 [ # # ]: 0 : } else if(op == gwpr_sendto_op_id) {
397 [ # # ][ # # ]: 0 : while((didwrite=sendto(fsrc->fd,_buf,towrite,0, _sockaddr(peer),peerlen)) && didwrite < 0 && (errno == EINTR));
[ # # ]
398 [ # # ]: 0 : } else if(op == gwpr_sendmsg_op_id) {
399 [ # # ][ # # ]: 0 : while((didwrite=sendmsg(fsrc->fd,(struct msghdr *)_buf,0)) && didwrite < 0 && (errno == EINTR));
[ # # ]
400 : 0 : }
401 : :
402 [ + - ]: 101 : if(didwrite > 0) {
403 : 101 : *written += didwrite;
404 : 101 : towrite -= didwrite;
405 : 101 : _buf += didwrite;
406 [ # # ]: 101 : } else if(didwrite < 0) {
407 : 0 : *errnm = errno;
408 : 0 : break;
409 : : }
410 : 101 : }
411 : 101 : }
412 : :
413 : : bool
414 : 101 : gwpr_synchronous_write(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf,
415 : 101 : gwpr_io_op_id op, struct sockaddr_storage * peer, socklen_t peerlen) {
416 : 101 : bool didwr = false;
417 : :
418 : : #if defined(GWPR_TRY_SYNCHRONOUS_WRITE_UNIX)
419 : :
420 : 101 : if(buf->len <= GWPR_SYNCHRONOUS_WRITE_MAX_BYTES) {
421 : : //the buffer requested is smaller than the maximum allowed
422 : : //bytes to allow synchronous writes with, so give it a shot.
423 : :
424 : : //setup vars
425 : 101 : gwrlsrc * src = _gwrlsrc(fsrc);
426 : 101 : gwprdata * pdata = fsrc->pdata;
427 : 101 : gwprbuf * usedbuf = buf;
428 : 101 : gwprbuf * rembuf = NULL;
429 : 101 : gwprwrq * q = NULL;
430 : 101 : gwrlevt * evt = NULL;
431 : :
432 [ + - ][ - + ]: 101 : if(pdata->wrfilters && pdata->wrfilters[0] != NULL) {
433 : : //call write filters
434 : :
435 : : //copy buffer before calling write filters, this is in case
436 : : //everything fails and we need to queue writing for later.
437 : 0 : gwprbuf * bfcp = gwpr_buf_get(pr,buf->bufsize);
438 [ # # ]: 0 : while(!bfcp) bfcp = gwpr_buf_get(pr,buf->bufsize);
439 : 0 : bfcp->len = buf->len;
440 [ # # ]: 0 : memcpy(bfcp->buf,buf->buf,buf->len);
441 : 0 : usedbuf = bfcp;
442 : :
443 : : //setup ioinfo for write filter
444 : 0 : gwpr_io_info ioinfo = {0};
445 : 0 : ioinfo.src = fsrc;
446 : 0 : ioinfo.buf = usedbuf;
447 : 0 : ioinfo.op = op;
448 [ # # ]: 0 : if(op == gwpr_sendto_op_id) {
449 : 0 : ioinfo.peerlen = peerlen;
450 : 0 : memcpy(&ioinfo.peer,peer,sizeof(ioinfo.peer));
451 : 0 : }
452 : :
453 : : //call filters
454 : 0 : int i = 0;
455 [ # # ]: 0 : for(; i<GWPR_FILTERS_MAX; i++) {
456 [ # # ]: 0 : if(!pdata->wrfilters[i]) break;
457 : 0 : pdata->wrfilters[i](pr,&ioinfo);
458 : 0 : }
459 : 0 : }
460 : :
461 : : //write the buffer
462 : 101 : int errnm = 0;
463 : 101 : size_t written = 0;
464 : 101 : gwpr_write_buffer(pr,fsrc,usedbuf,gwpr_write_op_id,peer,peerlen,&written,&errnm);
465 : :
466 [ + - ]: 101 : if(written == usedbuf->len) {
467 : : //fullwrite success
468 : :
469 : : //get a wrq for the event userdata
470 : 101 : q = gwprwrq_get(pr,fsrc);
471 : :
472 : : //get an event to post
473 : 101 : evt = gwrl_evt_create(pr->rl,src,&gwpr_src_activity,q,fsrc->fd,GWRL_SYNC_WRITE);
474 [ - + ]: 101 : while(!evt) evt = gwrl_evt_create(pr->rl,src,&gwpr_src_activity,q,fsrc->fd,GWRL_SYNC_WRITE);
475 : :
476 : : //post the event back to the proactor to catch.
477 : 101 : q->buf = usedbuf;
478 : 101 : q->wrop = op;
479 [ - + ]: 101 : if(op == gwpr_sendto_op_id) {
480 : 0 : q->peerlen = peerlen;
481 [ # # ]: 0 : memcpy(&q->peer,peer,sizeof(q->peer));
482 : 0 : }
483 : 101 : gwrl_post_evt(pr->rl,evt);
484 : :
485 : : //set result
486 : 101 : didwr = true;
487 [ # # ]: 101 : } else if(written < usedbuf->len) {
488 : : //partial write, copy the unwritten data to a new buffer.
489 : : //post a write event back to the proactor and queue the
490 : : //remaining data buffer for writing.
491 : :
492 : : //copy the remaining buffer data that hasn't been
493 : : //written to requeue later.
494 : 0 : size_t remaining = usedbuf->len - written;
495 : 0 : rembuf = gwpr_buf_get(pr,remaining);
496 [ # # ]: 0 : while(!rembuf) rembuf = gwpr_buf_get(pr,remaining);
497 [ # # ]: 0 : memcpy(rembuf->buf,usedbuf->buf + written,remaining);
498 : :
499 : : //get a wrq for the user data
500 : 0 : gwprwrq * q = gwprwrq_get(pr,fsrc);
501 : 0 : q->buf = usedbuf;
502 : 0 : q->wrop = op;
503 : :
504 : : //update the used buffer to reflect what was actually written.
505 : : //even though the remaining unwritten buffer data is still in
506 : : //the buffer it won't matter. users should only use what's in
507 : : //the len property.
508 : 0 : usedbuf->len = written;
509 : :
510 : : //update wrq for udp
511 [ # # ]: 0 : if(op == gwpr_sendto_op_id) {
512 : 0 : q->peerlen = peerlen;
513 [ # # ]: 0 : memcpy(&q->peer,peer,sizeof(q->peer));
514 : 0 : }
515 : :
516 : : //get event and post it
517 : 0 : gwrlevt * evt = gwrl_evt_create(pr->rl,_gwrlsrc(fsrc),&gwpr_src_activity,q,fsrc->fd,GWRL_SYNC_WRITE);
518 [ # # ]: 0 : while(!evt) evt = gwrl_evt_create(pr->rl,_gwrlsrc(fsrc),&gwpr_src_activity,q,fsrc->fd,GWRL_SYNC_WRITE);
519 : 0 : gwrl_post_evt(pr->rl,evt);
520 : :
521 [ # # ][ # # ]: 0 : if(errnm == 0 || errnm == EWOULDBLOCK || errnm == EAGAIN) {
[ # # ]
522 : : //either no errors, or the partial write errored out by
523 : : //the kernel telling us there's no more room to write.
524 : : //post the remaining buffer data to the write queue for later.
525 : :
526 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,op,peer,peerlen);
527 : 0 : }
528 : :
529 : : //set result
530 : 0 : didwr = true;
531 : 0 : } else {
532 : 0 : didwr = false;
533 : : }
534 : :
535 [ + - ][ - + ]: 101 : if(errnm == ECONNRESET || errnm == EPIPE) {
536 : : //closed file descriptor or eof, post close event back to
537 : : //proactor. for dispatching to the user. If there was a partial
538 : : //write eariler we pass back the remaining data buffer in
539 : : //case the user needs to handle it.
540 : :
541 : : //get a wrq for user data
542 : 0 : q = gwprwrq_get(pr,fsrc);
543 [ # # ]: 0 : if(written > 0) q->buf = rembuf;
544 : 0 : q->wrop = op;
545 : :
546 : : //get event and post it
547 : 0 : evt = gwrl_evt_create(pr->rl,_gwrlsrc(fsrc),&gwpr_src_activity,q,fsrc->fd,GWRL_SYNC_CLOSE);
548 [ # # ]: 0 : while(!evt) evt = gwrl_evt_create(pr->rl,_gwrlsrc(fsrc),&gwpr_src_activity,q,fsrc->fd,GWRL_SYNC_CLOSE);
549 : 0 : gwrl_post_evt(pr->rl,evt);
550 : :
551 : : //set result
552 : 0 : didwr = true;
553 : 0 : }
554 : :
555 [ - + ][ # # ]: 101 : else if(written == 0 && (errnm == EWOULDBLOCK || errnm == EAGAIN)) {
[ # # ]
556 : : //socket or fd is marked as non-blocking and no data could
557 : : //be written without blocking, post it all in the queue for
558 : : //writing later when the file descriptor is writable.
559 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,op,peer,peerlen);
560 : 0 : }
561 : :
562 [ - + ]: 101 : else if(errnm != 0) {
563 : : //other error we don't want to handle. post this back
564 : : //to the proactor for dispatching to the user.
565 : :
566 : : //create err info for the event.
567 : 0 : gwpr_error_info * errinfo = calloc(1,sizeof(gwpr_error_info));
568 : 0 : errinfo->errnm = errnm;
569 : 0 : errinfo->src = fsrc;
570 : 0 : errinfo->op = op;
571 : 0 : errinfo->buf = buf;
572 [ # # ]: 0 : if(written > 0) errinfo->buf = rembuf;
573 [ # # ][ # # ]: 0 : if(op == gwpr_write_op_id) memcpy(errinfo->fnc,"write\0",6);
574 [ # # ][ # # ]: 0 : else if(op == gwpr_send_op_id) memcpy(errinfo->fnc,"send\0",5);
575 [ # # ][ # # ]: 0 : else if(op == gwpr_sendto_op_id) memcpy(errinfo->fnc,"sendto\0",7);
576 [ # # ][ # # ]: 0 : else if(op == gwpr_sendmsg_op_id) memcpy(errinfo->fnc,"sendmsg\0",8);
577 : :
578 : : //get an event and post it
579 : 0 : evt = gwrl_evt_create(pr->rl,src,&gwpr_src_activity,errinfo,fsrc->fd,GWRL_SYNC_ERROR);
580 [ # # ]: 0 : while(!evt) evt = gwrl_evt_create(pr->rl,src,&gwpr_src_activity,errinfo,fsrc->fd,GWRL_SYNC_ERROR);
581 : 0 : gwrl_post_evt(pr->rl,evt);
582 : 101 : }
583 : 101 : }
584 : :
585 : : #endif
586 : 101 : return didwr;
587 : : }
588 : :
589 : : void
590 : 0 : gwpr_asynchronous_write(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf,
591 : 0 : gwpr_io_op_id op, struct sockaddr_storage * peer, socklen_t peerlen) {
592 : 0 : gwrlsrc * src = _gwrlsrc(fsrc);
593 : 0 : gwprdata * pdata = fsrc->pdata;
594 : 0 : gwprwrq * q = gwprwrq_get(pr,fsrc);
595 : 0 : gwrlsrc_flags_t flags = src->flags;
596 : 0 : if(!pdata->didwritecb) pdata->didwritecb = &io_activity;
597 : 0 : q->buf = buf;
598 : 0 : q->wrop = op;
599 [ # # ]: 0 : if(op == gwpr_sendto_op_id) {
600 : 0 : q->peerlen = peerlen;
601 [ # # ]: 0 : memcpy(&q->peer,peer,sizeof(q->peer));
602 : 0 : }
603 : 0 : gwprwrq_add(pr,fsrc,q);
604 : 0 : flset(flags,GWRL_ENABLED|GWRL_WR);
605 : 0 : gwrl_src_file_update_flags(pr->rl,src,flags);
606 : 0 : }
607 : :
608 : : int
609 : 0 : gwpr_accept(gwpr * pr, gwrlsrc_file * fsrc) {
610 : 0 : gwrlsrc * src = _gwrlsrc(fsrc);
611 : 0 : gwprdata * pdata = fsrc->pdata;
612 : 0 : gwrlsrc_flags_t flags = src->flags;
613 : 0 : if(!pdata->acceptcb) {
614 : 0 : gwerr("(6NMC4) cannot call accept without an accept callback set.");
615 : 0 : return -1;
616 : : }
617 : 0 : flset(flags,GWRL_ENABLED|GWRL_RD);
618 : 0 : gwrl_src_file_update_flags(pr->rl,src,flags);
619 : 0 : return 0;
620 : 0 : }
621 : :
622 : : int
623 : 0 : gwpr_connect(gwpr * pr, gwrlsrc_file * fsrc,
624 : 0 : struct sockaddr_storage * addr) {
625 : 0 : int res = 0;
626 : 0 : gwrlsrc * src = _gwrlsrc(fsrc);
627 : 0 : gwprdata * prdata = (gwprdata *)fsrc->pdata;
628 : 0 : gwrlsrc_flags_t flags = src->flags;
629 : 0 : if(!prdata->connectcb) {
630 : 0 : gwerr("(8rFG3) cannot call connect without a connect callback set.");
631 : 0 : return -1;
632 : : }
633 : 0 : gwsk_nonblock(fsrc->fd,1);
634 : 0 : res = gwsk_connect(fsrc->fd,addr);
635 : 0 : flset(flags,GWRL_ENABLED|GWRL_WR|GWRL_RD);
636 : 0 : gwrl_src_file_update_flags(pr->rl,src,flags);
637 : 0 : return res;
638 : 0 : }
639 : :
640 : : int
641 : 2 : gwpr_asynchronous_read(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf,
642 : 2 : gwpr_io_op_id op) {
643 : 2 : gwrlsrc * src = _gwrlsrc(fsrc);
644 : 2 : gwprdata * pdata = (gwprdata *)fsrc->pdata;
645 : 2 : gwrlsrc_flags_t flags = src->flags;
646 : 2 : if(!pdata->didreadcb) {
647 : 0 : gwerr("(7GB3H) cannot call recvfrom without a did read callback.");
648 : 0 : return -1;
649 : : }
650 : 2 : flset(flags,GWRL_ENABLED|GWRL_RD);
651 : 2 : pdata->rdbuf = buf;
652 : 2 : pdata->rdbufsize = buf->bufsize;
653 : 2 : pdata->rdop = op;
654 : 2 : gwrl_src_file_update_flags(pr->rl,src,flags);
655 : 2 : return 0;
656 : 2 : }
657 : :
658 : : void
659 : 0 : gwpr_stop_read(gwpr * pr, gwrlsrc_file * fsrc) {
660 : 0 : gwrlsrc * src = _gwrlsrc(fsrc);
661 : 0 : gwrlsrc_flags_t flags = src->flags;
662 : 0 : flclr(flags,GWRL_RD);
663 : 0 : gwrl_src_file_update_flags(pr->rl,src,flags);
664 : 0 : }
665 : :
666 : : int
667 : 2 : gwpr_read(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf) {
668 : 2 : return gwpr_asynchronous_read(pr,fsrc,buf,gwpr_read_op_id);
669 : : }
670 : :
671 : : int
672 : 0 : gwpr_recv(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf) {
673 : 0 : return gwpr_asynchronous_read(pr,fsrc,buf,gwpr_recv_op_id);
674 : : }
675 : :
676 : : int
677 : 0 : gwpr_recvfrom(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf) {
678 : 0 : return gwpr_asynchronous_read(pr,fsrc,buf,gwpr_recvfrom_op_id);
679 : : }
680 : :
681 : : int
682 : 0 : gwpr_recvmsg(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf) {
683 : 0 : return gwpr_asynchronous_read(pr,fsrc,buf,gwpr_recvmsg_op_id);
684 : : }
685 : :
686 : : int
687 : 101 : gwpr_write(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf) {
688 : 101 : gwprdata * pdata = fsrc->pdata;
689 : 101 : if(!pdata->didwritecb) {
690 : 0 : pdata->didwritecb = &io_activity;
691 : 0 : }
692 [ - + ]: 101 : if(pdata->wrq) {
693 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,gwpr_write_op_id,NULL,0);
694 : 0 : return 0;
695 : : }
696 [ - + ]: 101 : if(!(gwpr_synchronous_write(pr,fsrc,buf,gwpr_write_op_id,NULL,0))) {
697 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,gwpr_write_op_id,NULL,0);
698 : 0 : }
699 : 101 : return 0;
700 : 101 : }
701 : :
702 : : int
703 : 0 : gwpr_send(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf) {
704 : 0 : gwprdata * pdata = fsrc->pdata;
705 : 0 : if(!pdata->didwritecb) pdata->didwritecb = &io_activity;
706 [ # # ]: 0 : if(pdata->wrq) {
707 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,gwpr_send_op_id,NULL,0);
708 : 0 : return 0;
709 : : }
710 [ # # ]: 0 : if(!(gwpr_synchronous_write(pr,fsrc,buf,gwpr_send_op_id,NULL,0))) {
711 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,gwpr_send_op_id,NULL,0);
712 : 0 : }
713 : 0 : return 0;
714 : 0 : }
715 : :
716 : : int
717 : 0 : gwpr_sendto(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf,
718 : 0 : struct sockaddr_storage * peer, socklen_t peerlen) {
719 : 0 : gwprdata * pdata = fsrc->pdata;
720 : 0 : if(!pdata->didwritecb) pdata->didwritecb = &io_activity;
721 [ # # ]: 0 : if(pdata->wrq) {
722 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,gwpr_sendto_op_id,peer,peerlen);
723 : 0 : return 0;
724 : : }
725 [ # # ]: 0 : if(!(gwpr_synchronous_write(pr,fsrc,buf,gwpr_sendto_op_id,peer,peerlen))) {
726 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,gwpr_sendto_op_id,peer,peerlen);
727 : 0 : }
728 : 0 : return 0;
729 : 0 : }
730 : :
731 : : int
732 : 0 : gwpr_sendmsg(gwpr * pr, gwrlsrc_file * fsrc, gwprbuf * buf) {
733 : 0 : gwprdata * pdata = fsrc->pdata;
734 : 0 : if(!pdata->didwritecb) pdata->didwritecb = &io_activity;
735 [ # # ]: 0 : if(pdata->wrq) {
736 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,gwpr_sendmsg_op_id,NULL,0);
737 : 0 : return 0;
738 : : }
739 [ # # ]: 0 : if(!(gwpr_synchronous_write(pr,fsrc,buf,gwpr_sendmsg_op_id,NULL,0))) {
740 : 0 : gwpr_asynchronous_write(pr,fsrc,buf,gwpr_sendmsg_op_id,NULL,0);
741 : 0 : }
742 : 0 : return 0;
743 : 0 : }
744 : :
745 : : gwprwrq *
746 : 173 : gwprwrq_get(gwpr * pr, gwrlsrc_file * fsrc) {
747 : 173 : gwprwrq * wrq = NULL;
748 : 173 : if(pr->wrqcache) {
749 : 100 : wrq = pr->wrqcache;
750 : 100 : pr->nwrqcache--;
751 [ - + ]: 100 : if(wrq->next) {
752 : 0 : pr->wrqcache = wrq->next;
753 : 0 : } else {
754 : 100 : pr->wrqcache = NULL;
755 : : }
756 : 100 : } else {
757 : 73 : wrq = gwrl_mem_calloc(1,sizeof(gwprwrq));
758 [ - + ]: 73 : while(!wrq) {
759 : 0 : gwerr("(9DXIL) calloc error");
760 : 0 : wrq = gwrl_mem_calloc(1,sizeof(gwprwrq));
761 : 73 : }
762 : : }
763 : 173 : wrq->next = NULL;
764 : 173 : return wrq;
765 : : }
766 : :
767 : : void
768 : 155 : gwprwrq_free(gwpr * pr, gwrlsrc_file * fsrc, gwprwrq * wrq) {
769 : 155 : if(pr->nwrqcache == pr->options.gwpr_wrqueue_cache_max) {
770 : 0 : free(wrq);
771 : 0 : } else {
772 : 155 : wrq->next = NULL;
773 : 155 : wrq->buf = NULL;
774 [ + + ]: 155 : if(pr->wrqcache) wrq->next = pr->wrqcache;
775 : 155 : pr->wrqcache = wrq;
776 : 155 : pr->nwrqcache++;
777 : : }
778 : 155 : }
779 : :
780 : : void
781 : 0 : gwprwrq_free_list_no_cache(gwpr * pr, gwprwrq * wrq) {
782 : 0 : gwprwrq * _wrq = wrq;
783 : 0 : gwprwrq * _del = NULL;
784 [ # # ]: 0 : while(_wrq) {
785 : 0 : _del = _wrq;
786 : 0 : _wrq = _wrq->next;
787 : 0 : gwpr_buf_free(pr,_del->buf);
788 : 0 : free(_del);
789 : 0 : }
790 : 0 : }
791 : :
792 : : void
793 : 36 : gwprwrq_putback(gwpr * pr, gwrlsrc_file * fsrc, gwprwrq * q) {
794 : 36 : gwprdata * data = fsrc->pdata;
795 : 36 : if(!data->wrq) {
796 : 18 : data->wrq = q;
797 : 18 : } else {
798 : 18 : gwprwrq * tmpq = data->wrq;
799 : 18 : gwprwrq * tmpqlast = NULL;
800 : 18 : data->wrq = q;
801 [ + + ]: 72 : while(q) {
802 : 54 : tmpqlast = q;
803 : 54 : q = q->next;
804 : 54 : }
805 [ + - ]: 18 : if(tmpqlast) tmpqlast->next = tmpq;
806 : 18 : q = tmpq;
807 [ + + ]: 36 : while(q) {
808 : 18 : tmpqlast = q;
809 : 18 : q = q->next;
810 : 18 : }
811 [ + - ]: 18 : if(tmpqlast) {
812 : 18 : data->wrqlast = tmpqlast;
813 : 18 : data->wrqlast->next = NULL;
814 : 18 : }
815 : : }
816 : 36 : }
817 : :
818 : : void
819 : 54 : gwprwrq_add(gwpr * pr, gwrlsrc_file * fsrc, gwprwrq * q) {
820 : 54 : gwprdata * data = fsrc->pdata;
821 : 54 : if(!data->wrq) {
822 : 18 : data->wrq = q;
823 : 18 : q->next = NULL;
824 : 18 : } else {
825 [ + + ]: 36 : if(!data->wrqlast) {
826 : 18 : data->wrqlast = q;
827 : 18 : data->wrq->next = data->wrqlast;
828 : 18 : } else {
829 : 18 : data->wrqlast->next = q;
830 : 36 : data->wrqlast = q;
831 : : }
832 : : }
833 : 54 : }
834 : :
835 : : #ifdef __cplusplus
836 : : }
837 : : #endif
|