/* Client end of a simple demo of socket programming */ /* with multiple concurrent TCP connections in parallel. */ /* */ /* Usage: cc -pthread -o crappclient crappclient.c */ /* ./crappclient */ /* On some machines, you might need -lsocket or -lxnet */ /* */ /* Written by Carey Williamson January 12, 2013 */ /* Updated by Carey Williamson February 9, 2013 */ /* Include files for C socket programming and stuff */ #include #include #include #include #include #include #include #include #include #include #include #include /* Manifest constants used by client program */ #define MAX_HOSTNAME_LENGTH 64 #define MAX_MESSAGE_LENGTH 100 #define MAX_FILE_SIZE 2*1024*1024 #define MYPORTNUM 9000 /* must match the server's port! */ #define MAX_PARALLELISM 8 #define MSS 1448 /* Debugging flag */ /* #define DEBUG 1 */ /* #define DEBUG2 1 */ /* Menu selections */ #define DISPLAY 'D' #define REPORT 'R' #define PARALLEL 'P' #define CHUNK 'C' #define GETFILE 'G' #define WRITE 'W' #define QUIT 'Q' /* CRAPP parameters */ int P; int C; /* Global variables */ pthread_t sessiontid; /* session thread identifier */ pthread_t tid[MAX_PARALLELISM]; /* identifiers for concurrent worker threads */ int needed[MAX_PARALLELISM]; /* to manage thread termination */ int chosen[MAX_PARALLELISM]; /* to manage data transfer parallelism */ int baseport; int totalbytes, expectedbytes; char filein[MAX_FILE_SIZE]; struct timeval start, end; /* forward declaration of thread routines */ void *session(void *param); /* the main session thread */ void *workeri(void *param); /* the parallel worker threads */ int main() { pthread_attr_t attr; /* set of thread attributes */ int i; /* initialize default values for CRAPP */ P = 1; C = 1; /* get the default attributes */ pthread_attr_init(&attr); /* create the interactive session thread on port MYPORTNUM */ pthread_create(&sessiontid, &attr, session, NULL); /* create the parallel connection worker threads on higher ports */ baseport = MYPORTNUM+1; for( i = 0; i < MAX_PARALLELISM; i++ ) { needed[i] = 1; chosen[i] = 0; pthread_create(&tid[i], &attr, workeri, NULL); } /* wait for all of the worker threads to exit */ for( i = 0; i < MAX_PARALLELISM; i++ ) { pthread_join(tid[i], NULL); } /* wait for the session thread to exit */ pthread_join(sessiontid, NULL); /* all done! */ printf("Thank you for using the CRAPP client. Have a nice day!\n"); } /* Each worker thread will begin control in this function. */ void *workeri(void *param) { struct sockaddr_in myserver; char mydatabuffer[MAX_FILE_SIZE]; char message[10]; pthread_t mytid; int myindex, mysockfd, mybytes; int tempy; int i; /* Find out my own thread ID amongst the set of parallel worker threads */ mytid = pthread_self(); myindex = -1; for( i = 0; i < MAX_PARALLELISM; i++ ) { if( tid[i] == mytid ) myindex = i; } #ifdef DEBUG fprintf(stderr, "Worker thread %d being set up now...\n", myindex); #endif /* Initialize myserver sockaddr structure */ memset(&myserver, 0, sizeof(myserver)); myserver.sin_family = AF_INET; myserver.sin_port = htons(baseport+myindex); myserver.sin_addr.s_addr = inet_addr("136.159.5.20"); myserver.sin_addr.s_addr = inet_addr("127.0.0.1"); myserver.sin_addr.s_addr = inet_addr("136.159.5.22"); /* set up the transport-level end point to use TCP */ if( (mysockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1 ) { fprintf(stderr, "socket() call failed in worker thread %d!\n", myindex); exit(1); } /* connect the socket to the server's port */ if( connect(mysockfd, (struct sockaddr *)&myserver, sizeof(struct sockaddr_in)) == -1 ) { fprintf(stderr, "crappclient: connect() call failed in worker thread %d!\n", myindex); exit(1); } #ifdef DEBUG else fprintf(stderr, "Worker thread %d has contact with server now!\n", myindex); #endif while( needed[myindex] == 1 ) { if( chosen[myindex] == 1 ) { #ifdef DEBUG fprintf(stderr, "Worker thread %d being activated to receive data transfer now...\n", myindex); #endif sprintf(message, "GO"); if( send(mysockfd, message, strlen(message), 0) < 0 ) { fprintf(stderr, "Worker thread %d had trouble talking to server!", myindex); } #ifdef DEBUG else fprintf(stderr, "Worker thread %d sent GO message to server!\n", myindex); #endif chosen[myindex] = 0; /* see what the server sends to us */ tempy = 0; moredata: if( (mybytes = recv(mysockfd, mydatabuffer, MAX_FILE_SIZE, 0)) > 0 ) { /* make sure the message is null-terminated in C */ mydatabuffer[mybytes] = '\0'; #ifdef DEBUG printf("Worker thread %d received %d bytes from server...\n", myindex, mybytes); printf("%s\n", mydatabuffer); #endif totalbytes += mybytes; tempy += mybytes; /* Simple hack to read multiple segments of size MSS */ if( (mybytes < expectedbytes/P) && (mybytes % MSS == 0) ) goto moredata; #ifdef DEBUG fprintf(stderr, "Worker thread %d updating total bytes to %d\n", myindex, tempy); #endif mybytes = tempy; /* put back into correct file order */ int there, step; there = myindex*C; step = 0; for( i = 0; i < mybytes; i++ ) { filein[there+step] = mydatabuffer[i]; #ifdef DEBUG2 fprintf(stderr, "Putting %c at position %d\n", mydatabuffer[i], there+step); #endif step++; if( step == C ) { there += P*C; step = 0; } } if( totalbytes == expectedbytes ) { #ifdef DEBUG fprintf(stderr, "Got it all now, I think!\n"); #endif /* Record end time */ gettimeofday(&end, NULL); fprintf(stderr, "Transfer time: %ld usec\n", ((end.tv_sec * 1000000 + end.tv_usec) - (start.tv_sec * 1000000 + start.tv_usec))); } } else { /* an error condition if the server dies unexpectedly */ printf("Sorry, dude. Worker thread %d did not get anything!\n", myindex); close(mysockfd); exit(1); } chosen[myindex] = 0; } else sleep(1); } /* Clean up and exit */ close(mysockfd); pthread_exit(0); } /* Prompt the user to enter a command */ printmenu() { printf("\n"); printf("Please choose from the following selections:\n"); printf(" P - Set parallelism value\n"); printf(" C - Set chunk size value\n"); printf(" G - Get a file\n"); printf(" D - Display current settings\n"); printf(" R - Report file status\n"); printf(" W - Write file to disk\n"); printf(" Q - Exit program\n"); printf("Your desired menu selection? "); } /* interactive session portion of client */ void *session(void *param) { int sockfd, sockfd2; FILE *fp; char c; struct sockaddr_in server; struct hostent *hp; char hostname[MAX_HOSTNAME_LENGTH]; char message[MAX_MESSAGE_LENGTH]; char messageback[MAX_MESSAGE_LENGTH]; char choice; int len, bytes, done; int i; /* Initialization of server sockaddr data structure */ memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(MYPORTNUM); /* hard code the IP address so you don't need hostname resolver */ server.sin_addr.s_addr = inet_addr("136.159.5.20"); server.sin_addr.s_addr = inet_addr("127.0.0.1"); server.sin_addr.s_addr = inet_addr("136.159.5.22"); /* create the client socket for its transport-level end point */ if( (sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1 ) { fprintf(stderr, "crappclient: socket() call failed!\n"); exit(1); } /* connect the socket to the server's address */ if( connect(sockfd, (struct sockaddr *)&server, sizeof(struct sockaddr_in)) == -1 ) { fprintf(stderr, "crappclient: connect() call failed!\n"); exit(1); } /* Print welcome banner */ printf("Welcome! I am the CRAPP client!!\n\n"); printmenu(); c = getchar(); /* main loop: issue commands to server, get file, and print answer received */ done = 0; while( done == 0 ) { if( c == DISPLAY ) { /* get rid of newline */ c = getchar(); printf("Current settings: P=%d C=%d\n", P, C); } else if( c == REPORT ) { /* get rid of newline */ c = getchar(); fprintf(stderr, "Total bytes received for file is %d\n", totalbytes); filein[totalbytes] = '\0'; fprintf(stderr, "File contents:\n%s\n", filein); } else if( c == WRITE ) { /* get rid of newline */ c = getchar(); fp = fopen("result.txt", "w"); if( fp == NULL ) { fprintf(stderr, "Failed to open file for writing!\n"); exit(1); } for( i = 0; i < totalbytes; i++ ) { fprintf(fp, "%c", filein[i]); } fprintf(stderr, "Wrote %d bytes to file result.txt\n", totalbytes); } else /* get rest of input and build message for server */ { len = 0; message[0] = c; len++; while( (c = getchar()) != '\n' ) { message[len] = c; len++; } /* make sure the message is null-terminated in C */ message[len] = '\0'; if( message[0] == PARALLEL ) { /* read in string and convert to integer locally */ P = 0; char *spot = &message[2]; P = (*spot) - 48; spot++; while( isdigit(*spot) ) { P *= 10; P += (*spot) - 48; spot++; } if( P > MAX_PARALLELISM ) { fprintf(stderr, "Can't really go that high!\n"); P = MAX_PARALLELISM; } fprintf(stderr, "Setting parallelism value to %d\n", P); } if( message[0] == CHUNK ) { /* read in string and convert to integer locally */ C = 0; char *spot = &message[2]; C = (*spot) - 48; spot++; while( isdigit(*spot) ) { C *= 10; C += (*spot) - 48; spot++; } fprintf(stderr, "Setting chunk size to %d\n", C); } if( message[0] == GETFILE ) { expectedbytes = 0; totalbytes = 0; /* Record start time */ gettimeofday(&start, NULL); /* Activate the worker threads to receive data */ for( i = 0; i < MAX_PARALLELISM; i++ ) { if( i < P ) chosen[i] = 1; else chosen[i] = 0; } } if( message[0] == QUIT ) { /* Deactivate the worker threads */ for( i = 0; i < MAX_PARALLELISM; i++ ) needed[i] = 0; done = 1; } /* send it to the server via the socket */ if( send(sockfd, message, len, 0) < 0 ) { fprintf(stderr, "crappclient session: Sending command to server failed!\n"); } #ifdef DEBUG else fprintf(stderr, "crappclient session sent command message \"%s\" to server.\n", message); #endif /* see what the server sends back */ if( (bytes = recv(sockfd, messageback, MAX_MESSAGE_LENGTH, 0)) > 0 ) { /* make sure the message is null-terminated in C */ messageback[bytes] = '\0'; fprintf(stderr, "Answer received from server: "); fprintf(stderr, "%s\n", messageback); if( message[0] == GETFILE ) { /* read in string and convert to integer locally */ char *spot = &messageback[0]; expectedbytes = (*spot) - 48; spot++; while( isdigit(*spot) ) { expectedbytes *= 10; expectedbytes += (*spot) - 48; spot++; } fprintf(stderr, "Expecting %d bytes from server\n", expectedbytes); /* Activate the worker threads to receive data */ for( i = 0; i < P; i++ ) { if( expectedbytes < i*C ) { fprintf(stderr, "Don't really need client worker thread %d\n", i); chosen[i] = 0; } } } } else { /* an error condition if the server dies unexpectedly */ printf("Sorry, dude. Server failed!\n"); close(sockfd); exit(1); } } if( done == 0 ) { printmenu(); c = getchar(); } } /* Program all done, so clean up and exit the client */ close(sockfd); pthread_exit(0); }