00001 #include "sockRoutines.h"
00002
00003 #ifndef CMK_NO_SOCKETS
00004
00005 #include <stdio.h>
00006 #include <errno.h>
00007 #include <stdlib.h>
00008 #include <string.h>
00009 #include <signal.h>
00010 #include <time.h>
00011 #include <ctype.h>
00012 #if CMK_USE_POLL
00013 #include <poll.h>
00014 #endif
00015
00016 #if CMK_BPROC
00017 #include <sys/bproc.h>
00018 #endif
00019
00020 #if CMK_USE_CONVERSE
00021 # include "converse.h"
00022 #else
00023 # define CMI_TMP_SKIP
00024 # define CmiTmpAlloc(size) malloc(size)
00025 # define CmiTmpFree(ptr) free(ptr)
00026 #endif
00027
00028 #if !CMK_HAS_SOCKLEN
00029 typedef int socklen_t;
00030 #endif
00031
00032 #if CMK_HAS_GETIFADDRS
00033 #include <netinet/in.h>
00034 #include <ifaddrs.h>
00035 #include <net/if.h>
00036 #endif
00037
00038
00039 static int default_skt_abort(int code,const char *msg)
00040 {
00041 fprintf(stderr,"Fatal socket error: code %d-- %s\n",code,msg);
00042 exit(1);
00043 return -1;
00044 }
00045
00046 static skt_idleFn idleFunc=NULL;
00047 static skt_abortFn skt_abort=default_skt_abort;
00048 void skt_set_idle(skt_idleFn f) {idleFunc=f;}
00049 skt_abortFn skt_set_abort(skt_abortFn f)
00050 {
00051 skt_abortFn old=skt_abort;
00052 skt_abort=f;
00053 return old;
00054 }
00055
00056
00057
00058
00059 static int skt_ignore_SIGPIPE=0;
00060
00061 #if defined(_WIN32) && !defined(__CYGWIN__)
00062 static void doCleanup(void)
00063 { WSACleanup();}
00064 static int skt_inited=0;
00065
00066 void skt_init(void)
00067 {
00068 WSADATA WSAData;
00069 WORD version=0x0002;
00070 if (skt_inited) return;
00071 skt_inited=1;
00072 WSAStartup(version, &WSAData);
00073 atexit(doCleanup);
00074 }
00075
00076 void skt_close(SOCKET fd)
00077 {
00078 closesocket(fd);
00079 }
00080 #else
00081
00082 typedef void (*skt_signal_handler_fn)(int sig);
00083 static skt_signal_handler_fn skt_fallback_SIGPIPE=NULL;
00084 static void skt_SIGPIPE_handler(int sig) {
00085 if (skt_ignore_SIGPIPE) {
00086 fprintf(stderr,"Caught SIGPIPE.\n");
00087 signal(SIGPIPE,skt_SIGPIPE_handler);
00088 }
00089 else
00090 skt_fallback_SIGPIPE(sig);
00091 }
00092
00093 void skt_init(void)
00094 {
00095
00096
00097
00098
00099 skt_fallback_SIGPIPE=signal(SIGPIPE,skt_SIGPIPE_handler);
00100 }
00101 void skt_close(SOCKET fd)
00102 {
00103 skt_ignore_SIGPIPE=1;
00104 close(fd);
00105 skt_ignore_SIGPIPE=0;
00106 }
00107 #endif
00108
00109 #ifndef SKT_HAS_BUFFER_BEGIN
00110 void skt_buffer_begin(SOCKET sk) {}
00111 void skt_buffer_end(SOCKET sk) {}
00112 #endif
00113
00114
00115 static int ERRNO = -1;
00116
00117
00118
00119
00120
00121
00122 static int skt_should_retry(void)
00123 {
00124 int isinterrupt=0,istransient=0,istimeout=0;
00125 #if defined(_WIN32) && !defined(__CYGWIN__)
00126 int err=WSAGetLastError();
00127 if (err==WSAEINTR) isinterrupt=1;
00128 if (err==WSATRY_AGAIN||err==WSAECONNREFUSED)
00129 istransient=1;
00130 #else
00131 int err=errno;
00132 if (err==EINTR) isinterrupt=1;
00133 if (err==ETIMEDOUT) istimeout=1;
00134 if (err==EAGAIN||err==ECONNREFUSED
00135 ||err==EWOULDBLOCK||err==ENOBUFS
00136 #ifndef __CYGWIN__
00137 ||err==ECONNRESET
00138 #endif
00139 )
00140 istransient=1;
00141 #endif
00142 ERRNO = err;
00143 if (isinterrupt) {
00144
00145 if (idleFunc!=NULL) idleFunc();
00146 }
00147 else if (istransient)
00148 {
00149 if (idleFunc!=NULL) idleFunc();
00150 else sleep(1);
00151 }
00152 else
00153 return 0;
00154
00155
00156 return 1;
00157 }
00158
00159
00160
00161 int skt_tcp_no_nagle(SOCKET fd)
00162 {
00163 int flag, ok;
00164 flag = 1;
00165 ok = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
00166 return ok;
00167 }
00168
00169 #if CMK_USE_POLL
00170 int skt_select1(SOCKET fd,int msec)
00171 {
00172 struct pollfd fds[1];
00173 int begin, nreadable;
00174 int sec=msec/1000;
00175 int secLeft=sec;
00176
00177 fds[0].fd=fd;
00178 fds[0].events=POLLIN;
00179
00180 if (msec>0) begin = time(0);
00181 do
00182 {
00183 skt_ignore_SIGPIPE=1;
00184 nreadable = poll(fds, 1, msec);
00185 skt_ignore_SIGPIPE=0;
00186
00187 if (nreadable < 0) {
00188 if (skt_should_retry()) continue;
00189 else skt_abort(93200,"Fatal error in poll");
00190 }
00191 if (nreadable >0) return 1;
00192 }
00193 while(msec>0 && ((secLeft = sec - (time(0) - begin))>0));
00194
00195 return 0;
00196 }
00197
00198 #else
00199
00200
00201 int skt_select1(SOCKET fd,int msec)
00202 {
00203 int sec=msec/1000;
00204 fd_set rfds;
00205 struct timeval tmo;
00206 int secLeft=sec;
00207 int begin, nreadable;
00208
00209 FD_ZERO(&rfds);
00210 FD_SET(fd, &rfds);
00211
00212 if (msec>0) begin = time(0);
00213 do
00214 {
00215 tmo.tv_sec=secLeft;
00216 tmo.tv_usec = (msec-1000*sec)*1000;
00217 skt_ignore_SIGPIPE=1;
00218 nreadable = select(1+fd, &rfds, NULL, NULL, &tmo);
00219 skt_ignore_SIGPIPE=0;
00220
00221 if (nreadable < 0) {
00222 if (skt_should_retry()) continue;
00223 else skt_abort(93200,"Fatal error in select");
00224 }
00225 if (nreadable >0) return 1;
00226 }
00227 while(msec>0 && ((secLeft = sec - (time(0) - begin))>0));
00228
00229 return 0;
00230 }
00231 #endif
00232
00233
00234 skt_ip_t _skt_invalid_ip={{0}};
00235
00236 skt_ip_t skt_my_ip(void)
00237 {
00238 char hostname[1000];
00239 #if CMK_HAS_GETIFADDRS
00240 skt_ip_t ip = _skt_invalid_ip;
00241
00242
00243 struct ifaddrs *interfaces=0;
00244 int ifcount = 0;
00245 if( getifaddrs(&interfaces) == 0 ) {
00246 struct ifaddrs *interface;
00247 for( interface=interfaces; interface; interface=interface->ifa_next ) {
00248 if( (interface->ifa_flags & IFF_UP) && ! (interface->ifa_flags & IFF_LOOPBACK) ) {
00249 const struct sockaddr_in *addr = (const struct sockaddr_in*)interface->ifa_addr;
00250 if( addr && addr->sin_family==AF_INET ) {
00251 ifcount ++;
00252 memcpy(&ip, &addr->sin_addr, sizeof(ip));
00253 }
00254 }
00255 }
00256 freeifaddrs(interfaces);
00257 }
00258
00259 if (ifcount==1) return ip;
00260 #endif
00261
00262 if (gethostname(hostname, 999)==0)
00263 return skt_lookup_ip(hostname);
00264
00265 return _skt_invalid_ip;
00266 }
00267
00268 static int skt_parse_dotted(const char *str,skt_ip_t *ret)
00269 {
00270 int i,v;
00271 *ret=_skt_invalid_ip;
00272 for (i=0;i<sizeof(skt_ip_t);i++) {
00273 if (1!=sscanf(str,"%d",&v)) return 0;
00274 if (v<0 || v>255) return 0;
00275 while (isdigit(*str)) str++;
00276 if (i!=sizeof(skt_ip_t)-1) {
00277 if (*str!='.') return 0;
00278 } else {
00279 if (*str!=0) return 0;
00280 }
00281 str++;
00282 ret->data[i]=(unsigned char)v;
00283 }
00284 return 1;
00285 }
00286
00287
00288 skt_ip_t skt_lookup_ip(const char *name)
00289 {
00290 skt_ip_t ret=_skt_invalid_ip;
00291
00292 if (skt_parse_dotted(name,&ret))
00293 return ret;
00294 else {
00295 struct hostent *h = gethostbyname(name);
00296 if (h==0) return _skt_invalid_ip;
00297 memcpy(&ret,h->h_addr_list[0],h->h_length);
00298 return ret;
00299 }
00300 }
00301
00302
00303
00304
00305 skt_ip_t skt_innode_my_ip(void)
00306 {
00307 #if CMK_BPROC
00308
00309 char hostname[200];
00310 sprintf(hostname, "%d", bproc_currnode());
00311 return skt_innode_lookup_ip(hostname);
00312 #else
00313 return skt_my_ip();
00314 #endif
00315 }
00316
00317 skt_ip_t skt_innode_lookup_ip(const char *name)
00318 {
00319 #if CMK_BPROC
00320 struct sockaddr_in addr;
00321 int len = sizeof(struct sockaddr_in);
00322 if (-1 == bproc_nodeaddr(atoi(name), &addr, &len)) {
00323 return _skt_invalid_ip;
00324 }
00325 else {
00326 skt_ip_t ret;
00327 memcpy(&ret,&addr.sin_addr.s_addr,sizeof(ret));
00328 return ret;
00329 }
00330 #else
00331 return skt_lookup_ip(name);
00332 #endif
00333 }
00334
00335
00336 char *skt_print_ip(char *dest,skt_ip_t addr)
00337 {
00338 char *o=dest;
00339 int i;
00340 for (i=0;i<sizeof(addr);i++) {
00341 const char *trail=".";
00342 if (i==sizeof(addr)-1) trail="";
00343 sprintf(o,"%d%s",(int)addr.data[i],trail);
00344 o+=strlen(o);
00345 }
00346 return dest;
00347 }
00348 int skt_ip_match(skt_ip_t a,skt_ip_t b)
00349 {
00350 return 0==memcmp(&a,&b,sizeof(a));
00351 }
00352 struct sockaddr_in skt_build_addr(skt_ip_t IP,int port)
00353 {
00354 struct sockaddr_in ret={0};
00355 ret.sin_family=AF_INET;
00356 ret.sin_port = htons((short)port);
00357 memcpy(&ret.sin_addr,&IP,sizeof(IP));
00358 return ret;
00359 }
00360
00361 SOCKET skt_datagram(unsigned int *port, int bufsize)
00362 {
00363 int connPort=(port==NULL)?0:*port;
00364 struct sockaddr_in addr=skt_build_addr(_skt_invalid_ip,connPort);
00365 socklen_t len;
00366 SOCKET ret;
00367
00368 retry:
00369 ret = socket(AF_INET,SOCK_DGRAM,0);
00370 if (ret == SOCKET_ERROR) {
00371 if (skt_should_retry()) goto retry;
00372 return skt_abort(93490,"Error creating datagram socket.");
00373 }
00374 if (bind(ret, (struct sockaddr *)&addr, sizeof(addr)) == SOCKET_ERROR)
00375 return skt_abort(93491,"Error binding datagram socket.");
00376
00377 len = sizeof(addr);
00378 if (getsockname(ret, (struct sockaddr *)&addr , &len))
00379 return skt_abort(93492,"Error getting address on datagram socket.");
00380
00381 if (bufsize)
00382 {
00383 len = sizeof(int);
00384 if (setsockopt(ret, SOL_SOCKET , SO_RCVBUF , (char *)&bufsize, len) == SOCKET_ERROR)
00385 return skt_abort(93495,"Error on RCVBUF sockopt for datagram socket.");
00386 if (setsockopt(ret, SOL_SOCKET , SO_SNDBUF , (char *)&bufsize, len) == SOCKET_ERROR)
00387 return skt_abort(93496,"Error on SNDBUF sockopt for datagram socket.");
00388 }
00389
00390 if (port!=NULL) *port = (int)ntohs(addr.sin_port);
00391 return ret;
00392 }
00393 SOCKET skt_server(unsigned int *port)
00394 {
00395 return skt_server_ip(port,NULL);
00396 }
00397
00398 SOCKET skt_server_ip(unsigned int *port,skt_ip_t *ip)
00399 {
00400 SOCKET ret;
00401 socklen_t len;
00402 int on = 1;
00403 int connPort=(port==NULL)?0:*port;
00404 struct sockaddr_in addr=skt_build_addr((ip==NULL)?_skt_invalid_ip:*ip,connPort);
00405
00406 retry:
00407 ret = socket(PF_INET, SOCK_STREAM, 0);
00408
00409 if (ret == SOCKET_ERROR) {
00410 if (skt_should_retry()) goto retry;
00411 else return skt_abort(93483,"Error creating server socket.");
00412 }
00413 setsockopt(ret, SOL_SOCKET, SO_REUSEADDR, (void *) &on, sizeof(on));
00414
00415 if (bind(ret, (struct sockaddr *)&addr, sizeof(addr)) == SOCKET_ERROR)
00416 return skt_abort(93484,"Error binding server socket.");
00417 if (listen(ret,5) == SOCKET_ERROR)
00418 return skt_abort(93485,"Error listening on server socket.");
00419 len = sizeof(addr);
00420 if (getsockname(ret, (struct sockaddr *)&addr, &len) == SOCKET_ERROR)
00421 return skt_abort(93486,"Error getting name on server socket.");
00422
00423 if (port!=NULL) *port = (int)ntohs(addr.sin_port);
00424 if (ip!=NULL) memcpy(ip, &addr.sin_addr, sizeof(*ip));
00425 return ret;
00426 }
00427
00428 SOCKET skt_accept(SOCKET src_fd,skt_ip_t *pip, unsigned int *port)
00429 {
00430 socklen_t len;
00431 struct sockaddr_in addr={0};
00432 SOCKET ret;
00433 len = sizeof(addr);
00434 retry:
00435 ret = accept(src_fd, (struct sockaddr *)&addr, &len);
00436 if (ret == SOCKET_ERROR) {
00437 if (skt_should_retry()) goto retry;
00438 else return skt_abort(93523,"Error in accept.");
00439 }
00440
00441 if (port!=NULL) *port=ntohs(addr.sin_port);
00442 if (pip!=NULL) memcpy(pip,&addr.sin_addr,sizeof(*pip));
00443 return ret;
00444 }
00445
00446
00447 SOCKET skt_connect(skt_ip_t ip, int port, int timeout)
00448 {
00449 struct sockaddr_in addr=skt_build_addr(ip,port);
00450 int ok, begin;
00451 SOCKET ret;
00452
00453 begin = time(0);
00454 while (time(0)-begin < timeout)
00455 {
00456 ret = socket(AF_INET, SOCK_STREAM, 0);
00457 if (ret==SOCKET_ERROR)
00458 {
00459 if (skt_should_retry()) continue;
00460 else return skt_abort(93512,"Error creating socket");
00461 }
00462 ok = connect(ret, (struct sockaddr *)&(addr), sizeof(addr));
00463 if (ok != SOCKET_ERROR)
00464 return ret;
00465 else {
00466 skt_close(ret);
00467 if (skt_should_retry()) continue;
00468 else {
00469 #if ! defined(_WIN32) || defined(__CYGWIN__)
00470 if (ERRNO == ETIMEDOUT) continue;
00471 #endif
00472 return skt_abort(93515,"Error connecting to socket\n");
00473 }
00474 }
00475 }
00476
00477 if (timeout==60)
00478 return skt_abort(93517,"Timeout in socket connect\n");
00479 return INVALID_SOCKET;
00480 }
00481
00482 void skt_setSockBuf(SOCKET skt, int bufsize)
00483 {
00484 int len = sizeof(int);
00485 if (setsockopt(skt, SOL_SOCKET , SO_SNDBUF , (char *)&bufsize, len) == SOCKET_ERROR)
00486 skt_abort(93496,"Error on SNDBUF sockopt for datagram socket.");
00487 if (setsockopt(skt, SOL_SOCKET , SO_RCVBUF , (char *)&bufsize, len) == SOCKET_ERROR)
00488 skt_abort(93496,"Error on RCVBUF sockopt for datagram socket.");
00489 }
00490
00491 int skt_recvN(SOCKET hSocket,void *buff,int nBytes)
00492 {
00493 int nLeft,nRead;
00494 char *pBuff=(char *)buff;
00495
00496 nLeft = nBytes;
00497 while (0 < nLeft)
00498 {
00499 if (0==skt_select1(hSocket,600*1000))
00500 return skt_abort(93610,"Timeout on socket recv!");
00501 skt_ignore_SIGPIPE=1;
00502 nRead = recv(hSocket,pBuff,nLeft,0);
00503 skt_ignore_SIGPIPE=0;
00504 if (nRead<=0)
00505 {
00506 if (nRead==0) return skt_abort(93620,"Socket closed before recv.");
00507 if (skt_should_retry()) continue;
00508 else return skt_abort(93650+hSocket,"Error on socket recv!");
00509 }
00510 else
00511 {
00512 nLeft -= nRead;
00513 pBuff += nRead;
00514 }
00515 }
00516 return 0;
00517 }
00518
00519 int skt_sendN(SOCKET hSocket,const void *buff,int nBytes)
00520 {
00521 int nLeft,nWritten;
00522 const char *pBuff=(const char *)buff;
00523
00524 nLeft = nBytes;
00525 while (0 < nLeft)
00526 {
00527 skt_ignore_SIGPIPE=1;
00528 nWritten = send(hSocket,pBuff,nLeft,0);
00529 skt_ignore_SIGPIPE=0;
00530 if (nWritten<=0)
00531 {
00532 if (nWritten==0) return skt_abort(93720,"Socket closed before send.");
00533 if (skt_should_retry()) continue;
00534 else return skt_abort(93700+hSocket,"Error on socket send!");
00535 }
00536 else
00537 {
00538 nLeft -= nWritten;
00539 pBuff += nWritten;
00540 }
00541 }
00542 return 0;
00543 }
00544
00545
00546
00547
00548 #define skt_sendV_max (16*1024)
00549
00550 int skt_sendV(SOCKET fd,int nBuffers,const void **bufs,int *lens)
00551 {
00552 int b,len=0;
00553 for (b=0;b<nBuffers;b++) len+=lens[b];
00554 if (len<=skt_sendV_max) {
00555 char *buf=(char *)CmiTmpAlloc(skt_sendV_max);
00556 char *dest=buf;
00557 int ret;
00558 for (b=0;b<nBuffers;b++) {
00559 memcpy(dest,bufs[b],lens[b]);
00560 dest+=lens[b];
00561 }
00562 ret=skt_sendN(fd,buf,len);
00563 CmiTmpFree(buf);
00564 return ret;
00565 }
00566 else {
00567 int ret;
00568 for (b=0;b<nBuffers;b++)
00569 if (0!=(ret=skt_sendN(fd,bufs[b],lens[b])))
00570 return ret;
00571 return 0;
00572 }
00573 }
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583
00584
00585
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608 ChMessageInt_t ChMessageInt_new(unsigned int src)
00609 {
00610 int i; ChMessageInt_t ret;
00611 for (i=0;i<4;i++) ret.data[i]=(unsigned char)(src>>(8*(3-i)));
00612 return ret;
00613 }
00614 unsigned int ChMessageInt(ChMessageInt_t src)
00615 {
00616 int i; unsigned int ret=0;
00617 for (i=0;i<4;i++) {ret<<=8;ret+=src.data[i];}
00618 return (int)ret;
00619 }
00620
00621 ChMessageLong_t ChMessageLong_new(CMK_TYPEDEF_UINT8 src)
00622 {
00623 int i; ChMessageLong_t ret;
00624 for (i=0;i<8;i++) ret.data[i]=(unsigned char)(src>>(8*(7-i)));
00625 return ret;
00626 }
00627 CMK_TYPEDEF_UINT8 ChMessageLong(ChMessageLong_t src)
00628 {
00629 int i; CMK_TYPEDEF_UINT8 ret=0;
00630 for (i=0;i<8;i++) {ret<<=8;ret+=src.data[i];}
00631 return (CMK_TYPEDEF_UINT8)ret;
00632 }
00633
00634 int ChMessage_recv(SOCKET fd,ChMessage *dst)
00635 {
00636
00637 if (0!=ChMessageHeader_recv(fd,dst)) return -1;
00638 if (0!=ChMessageData_recv(fd,dst)) return -1;
00639 return 0;
00640 }
00641
00642 int ChMessageHeader_recv(SOCKET fd,ChMessage *dst)
00643 {
00644
00645 if (0!=skt_recvN(fd,(char *)&dst->header,sizeof(dst->header))) return -1;
00646
00647 dst->len=ChMessageInt(dst->header.len);
00648 dst->data=0;
00649 return 0;
00650 }
00651 int ChMessageData_recv(SOCKET fd,ChMessage *dst)
00652 {
00653 dst->data=(char *)malloc(dst->len);
00654
00655 if (0!=skt_recvN(fd,dst->data,dst->len)) return -1;
00656 return 0;
00657 }
00658
00659 void ChMessage_free(ChMessage *doomed)
00660 {
00661 free(doomed->data);
00662 strncpy(doomed->header.type,"Free'd",CH_TYPELEN);
00663 doomed->data=NULL;
00664 doomed->len=-1234;
00665 }
00666 void ChMessageHeader_new(const char *type,int len,ChMessageHeader *dst)
00667 {
00668 dst->len=ChMessageInt_new(len);
00669 if (type==NULL) type="default";
00670 strncpy(dst->type,type,CH_TYPELEN);
00671 }
00672 void ChMessage_new(const char *type,int len,ChMessage *dst)
00673 {
00674 ChMessageHeader_new(type,len,&dst->header);
00675 dst->len=len;
00676 dst->data=(char *)malloc(dst->len);
00677 }
00678 int ChMessage_send(SOCKET fd,const ChMessage *src)
00679 {
00680 const void *bufs[2]; int lens[2];
00681 bufs[0]=&src->header; lens[0]=sizeof(src->header);
00682 bufs[1]=src->data; lens[1]=src->len;
00683 return skt_sendV(fd,2,bufs,lens);
00684 }
00685
00686 #else
00687
00688 skt_ip_t _skt_invalid_ip={{0}};
00689
00690 skt_ip_t skt_my_ip(void)
00691 {
00692 return _skt_invalid_ip;
00693 }
00694
00695 #endif