/* Server end for Assignment 2: socket programming */ /* with multiple concurrent TCP connections in parallel! */ /* */ /* Usage: cc -pthread -o crappserver crappserver.c */ /* ./crappserver */ /* On some machines, you might need -lsocket -lxnet */ /* */ /* Written by Carey Williamson January 12, 2013 */ /* Include files for C socket programming and stuff */ #include #include #include #include #include #include #include #include #include /* Global manifest constants */ #define MAX_MESSAGE_LENGTH 100 #define MAX_FILENAME_LENGTH 20 #define MAX_FILE_SIZE 2*1024*1024 #define MYPORTNUM 9000 #define MAX_PARALLELISM 8 /* Session commands */ #define PARALLEL 'P' #define CHUNK 'C' #define GETFILE 'G' #define QUIT 'Q' /* Optional verbose debugging output */ /* #define DEBUG 1 */ /* #define DEBUG2 1 */ /* #define DEBUG3 1 */ /* CRAPP parameters */ int P; int C; /* Global variables */ char filename[MAX_FILENAME_LENGTH]; char fileout[MAX_FILE_SIZE]; int needed[MAX_PARALLELISM]; /* to manage thread termination */ int chosen[MAX_PARALLELISM]; /* to manage data transfer parallelism */ int bytestogo; int baseport; pthread_t sessiontid; /* session thread identifier */ pthread_t tid[MAX_PARALLELISM]; /* identifiers for concurrent worker threads */ void *session(void *param); /* the main session thread */ void *workeri(void *param); /* the parallel worker threads */ int main() { pthread_attr_t attr; /* set of thread attributes */ int i; /* initialize default values for CRAPP */ P = 1; C = 1; /* get the default attributes */ pthread_attr_init(&attr); /* create the interactive session thread on port MYPORTNUM */ pthread_create(&sessiontid, &attr, session, NULL); /* create the parallel connection worker threads on higher ports */ baseport = MYPORTNUM+1; for( i = 0; i < MAX_PARALLELISM; i++ ) { needed[i] = 1; chosen[i] = 0; pthread_create(&tid[i], &attr, workeri, NULL); } /* wait for the session thread to exit */ pthread_join(sessiontid, NULL); /* wait for all of the worker threads to exit */ for( i = 0; i < MAX_PARALLELISM; i++ ) { pthread_join(tid[i], NULL); } } /* Each worker thread will begin control in this function. */ void *workeri(void *param) { struct sockaddr_in myserver; char mydatabuffer[MAX_FILE_SIZE]; char mymessagein[MAX_MESSAGE_LENGTH]; pthread_t mytid; int myindex, mysockfd, mychildsockfd, mybytes, myresult; int here, there, step; int i; /* Find out my own thread ID amongst the set of parallel worker threads */ mytid = pthread_self(); myindex = -1; for( i = 0; i < MAX_PARALLELISM; i++ ) { if( tid[i] == mytid ) myindex = i; } #ifdef DEBUG2 fprintf(stderr, "Worker thread %d being set up now...\n", myindex); #endif /* Initialize myserver sockaddr structure */ memset(&myserver, 0, sizeof(myserver)); myserver.sin_family = AF_INET; myserver.sin_port = htons(baseport+myindex); myserver.sin_addr.s_addr = htonl(INADDR_ANY); /* set up the transport-level end point to use TCP */ if( (mysockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1 ) { fprintf(stderr, "socket() call failed in worker thread %d!\n", myindex); exit(1); } /* bind a specific address and port to the end point */ if( bind(mysockfd, (struct sockaddr *)&myserver, sizeof(struct sockaddr_in) ) == -1 ) { fprintf(stderr, "bind() call failed in worker thread %d!\n", myindex); exit(1); } #ifdef DEBUG else fprintf(stderr, "Worker thread %d bound to port %d\n", myindex, baseport+myindex); #endif /* start listening for incoming connections from clients */ if( listen(mysockfd, 5) == -1 ) { fprintf(stderr, "listen() call failed in worker thread %d!\n", myindex); exit(1); } /* accept a connection */ if( (mychildsockfd = accept(mysockfd, NULL, NULL)) == -1 ) { fprintf(stderr, "accept() call failed in worker thread %d!\n", myindex); exit(1); } #ifdef DEBUG else fprintf(stderr, "Worker thread %d has contact from client now...\n", myindex); #endif again: while( (needed[myindex]) && (myresult = recv(mychildsockfd, mymessagein, MAX_MESSAGE_LENGTH, 0)) <= 0 ) { if( myresult < 0 ) { fprintf(stderr, "recv() call failed in worker thread %d!\n", myindex); exit(1); } } /* gross temporary hack */ if( needed[myindex] == 0 ) goto alldone; #ifdef DEBUG3 fprintf(stderr, "Worker thread %d received %d byte message: %s\n", myindex, strlen(mymessagein), mymessagein); fprintf(stderr, "Worker thread %d activated for %d byte data transfer of file %s...\n", myindex, bytestogo, filename); #endif /* set up my buffer of data from the global data, and send it to client */ for( i = 0; i < MAX_MESSAGE_LENGTH; i++ ) { mydatabuffer[i] = '\0'; } here = 0; there = myindex*C; step = 0; mybytes = 0; while( there < bytestogo ) { for( step = 0; step < C; step++ ) { if( there+step < bytestogo ) { mydatabuffer[here+step] = fileout[there+step]; #ifdef DEBUG2 fprintf(stderr, "Putting byte %c into position %d\n", mydatabuffer[here+step], here+step); #endif mybytes++; } } here += step; there += P*C; } mydatabuffer[mybytes] = '\0'; #ifdef DEBUG3 fprintf(stderr, "Worker thread %d prepped %d bytes to send...\n", myindex, mybytes); #endif #ifdef DEBUG2 fprintf(stderr, "%s\n", mydatabuffer); #endif if( (myresult = send(mychildsockfd, mydatabuffer, mybytes, 0)) < 0) { fprintf(stderr, "Worker thread %d had problem sending data bytes\n", myindex); } #ifdef DEBUG3 else fprintf(stderr, "Worker thread %d sent %d bytes to client.\n", myindex, myresult); #endif if( needed[myindex] == 1 ) goto again; alldone: /* Clean up and exit */ close(mysockfd); close(mychildsockfd); pthread_exit(0); } /* Thread for the interactive session part */ void *session(void *param) { struct sockaddr_in server; char messagein[MAX_MESSAGE_LENGTH]; char messageout[MAX_MESSAGE_LENGTH]; int parentsockfd, childsockfd; int i, j; int pid; char c; FILE *fp; int index; int done; /* Initialize server sockaddr structure */ memset(&server, 0, sizeof(server)); server.sin_family = AF_INET; server.sin_port = htons(MYPORTNUM); server.sin_addr.s_addr = htonl(INADDR_ANY); /* set up the transport-level end point to use TCP */ if( (parentsockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1 ) { fprintf(stderr, "crappserver: socket() call failed!\n"); exit(1); } /* bind a specific address and port to the end point */ if( bind(parentsockfd, (struct sockaddr *)&server, sizeof(struct sockaddr_in) ) == -1 ) { fprintf(stderr, "crappserver: bind() call failed!\n"); exit(1); } /* start listening for incoming connections from clients */ if( listen(parentsockfd, 5) == -1 ) { fprintf(stderr, "crappserver: listen() call failed!\n"); exit(1); } /* initialize message strings just to be safe (null-terminated) */ for( i = 0; i < MAX_MESSAGE_LENGTH; i++ ) { messagein[i] = '\0'; messageout[i] = '\0'; } fprintf(stderr, "Welcome! I am the CRAPP server!!\n\n"); fprintf(stderr, "Parent process listening on port %d...\n\n", MYPORTNUM); /* Main loop: server loops forever listening for requests */ done = 0; while( done == 0 ) { /* accept a connection */ if( (childsockfd = accept(parentsockfd, NULL, NULL)) == -1 ) { fprintf(stderr, "crappserver: accept() call failed!\n"); exit(1); } /* obtain the message from this client */ while( recv(childsockfd, messagein, MAX_MESSAGE_LENGTH, 0) > 0 ) { /* print out the received message */ #ifdef DEBUG3 printf("Child process received command: %s\n", messagein); #endif if( messagein[0] == QUIT ) { for( i = 0; i < MAX_PARALLELISM; i++ ) { needed[i] = 0; } done = 1; /* create the outgoing message (as an ASCII string) */ sprintf(messageout, "BYE\n"); } if( messagein[0] == PARALLEL ) { /* read in string and convert to integer locally */ P = 0; char *spot = &messagein[2]; P = (*spot) - 48; spot++; while( isdigit(*spot) ) { P *= 10; P += (*spot) - 48; spot++; } if( P > MAX_PARALLELISM ) { fprintf(stderr, "Can't really go that high!\n"); P = MAX_PARALLELISM; } fprintf(stderr, "Setting parallelism to %d\n", P); /* create the outgoing message (as an ASCII string) */ sprintf(messageout, "P=%d C=%d\n", P, C); } if( messagein[0] == CHUNK ) { /* read in string and convert to integer locally */ C = 0; char *spot = &messagein[2]; C = (*spot) - 48; spot++; while( isdigit(*spot) ) { C *= 10; C += (*spot) - 48; spot++; } fprintf(stderr, "Setting chunk size to %d\n", C); /* create the outgoing message (as an ASCII string) */ sprintf(messageout, "P=%d C=%d\n", P, C); } if( messagein[0] == GETFILE ) { for( i = 0; i < strlen(messagein)-2; i++ ) filename[i] = messagein[i+2]; filename[i] = '\0'; fp = fopen(filename, "r"); if( fp == NULL ) { fprintf(stderr, "Unable to open file %s\n", filename); exit(1); } i = 0; while( (c = getc(fp)) != EOF ) { fileout[i] = c; i++; } fileout[i] = '\0'; #ifdef DEBUG3 fprintf(stderr, "That file has %d bytes!\n", i); #endif #ifdef DEBUG2 fprintf(stderr, "Contents of fileout: %s\n", fileout); #endif fclose(fp); bytestogo = i; sprintf(messageout, "%d bytes of file %s coming shortly...\n", bytestogo, filename); #ifdef DEBUG printf("Child about to send file %s of size %d using P = %d and C = %d\n", filename, bytestogo, P, C); #endif /* activate the transfer */ #ifdef DEBUG3 fprintf(stderr, "About to activate %d workers for data transfer...\n", P); #endif for( i = 0; i < MAX_PARALLELISM; i++ ) { if( i < P ) { if( bytestogo < i*C ) { #ifdef DEBUG3 fprintf(stderr, "Don't really need server worker thread %d\n", i); #endif chosen[i] = 0; } else { chosen[i] = 1; #ifdef DEBUG3 fprintf(stderr, "Trying to activate server worker thread %d\n", i); #endif } } else chosen[i] = 0; } } /* send the result message back to the client */ if( send(childsockfd, messageout, strlen(messageout), 0) < 0) { fprintf(stderr, "Child had problem sending message!\n"); } #ifdef DEBUG3 else fprintf(stderr, "Child sent message: %s\n", messageout); #endif /* clear out message strings again to be safe */ for( i = 0; i < MAX_MESSAGE_LENGTH; i++ ) { messagein[i] = '\0'; messageout[i] = '\0'; } } /* when client is no longer sending information to us, */ /* the socket can be closed and the child process terminated */ close(childsockfd); } /* all done! */ printf("Thank you for using the CRAPP server. Have a nice day!\n"); pthread_exit(0); }