diff options
Diffstat (limited to 'server.c')
-rw-r--r-- | server.c | 157 |
1 files changed, 10 insertions, 147 deletions
@@ -76,8 +76,6 @@ void send_invalid_command_reply(int fd, size_t message_size, char* message); main(int argc, char *argv[]) { - // threads to control reading files at chunks while the other threads sleep - // station_data = malloc(sizeof(station_t) * NUM_STATIONS); // check and assign arguments if (argc < 3) { fprintf(stderr,"usage: ./snowcast_server <listen port> <file0> [file 1] [file 2] ... \n"); @@ -87,9 +85,6 @@ main(int argc, char *argv[]) port = argv[1]; num_stations = argc - 2; - // printf("port: %s\n", port); - // printf("num_stations: %d\n", num_stations); - // init stations size_t totalSize = 0; // get size to malloc @@ -106,12 +101,6 @@ main(int argc, char *argv[]) station_data[i - 2] = (station_t) { 0, argv[i]}; } - // print all indexes in station data - // for (int i = 0; i < num_stations; i++) - // { - // printf("station %d: %s\n", i, station_data[i].filePath); - // } - // make array of user data user_data = malloc(sizeof(user_t) * max_active_users); if (!user_data) { perror("malloc userdata"); return 1; } @@ -251,7 +240,6 @@ int send_all_udp(int udp_sockfd, char *buf, int *len, struct addrinfo *thread_re void udp_port_cleanup_handler(void *arg) { int sockfd = (int) arg; - printf("Called clean-up handler, closing socket %d\n", sockfd); close(sockfd); } @@ -259,9 +247,6 @@ void udp_port_cleanup_handler(void *arg) void *send_udp_packet_routine(void *arg) { // unpack args int user_index = (int) arg; - // printf("thread : user_index: %d\n", user_index); - // print user data - // print_user_data(user_index); // declare vairables to be used int did_work = 1; @@ -271,16 +256,12 @@ void *send_udp_packet_routine(void *arg) { struct addrinfo thread_hints, *thread_res, *thread_servinfo; int error_code; - // TODO: add error checking on these calls*** - // setup hints memset(&thread_hints, 0, sizeof thread_hints); thread_hints.ai_family = AF_INET; // use IPv4 only thread_hints.ai_socktype = SOCK_DGRAM; thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me - // setup the socket for client listener DATAGRAM (udp) - // cover the port integer to a string int int_port = user_data[user_index].udpPort; int length = snprintf( NULL, 0, "%d", int_port ); char* port = malloc( length + 1 ); @@ -309,10 +290,6 @@ void *send_udp_packet_routine(void *arg) { return (NULL); } - // bind(udp_sockfd, thread_res->ai_addr, thread_res->ai_addrlen); - - - // freeaddrinfo(thread_servinfo) pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd); while (1) { @@ -350,29 +327,20 @@ void *send_udp_packet_routine(void *arg) { } size_t BYTES_PER_SECOND = 16*1024; // read 1000 bytes of the file + char file_buffer[BYTES_PER_SECOND]; - if (read(stream_fd, file_buffer, BYTES_PER_SECOND) == -1) { + int bytes_read = 0; + if ((bytes_read = read(stream_fd, file_buffer, BYTES_PER_SECOND)) == -1) { perror("fread"); return (NULL); } close(stream_fd); - // printf("send data: thread %d \n", user_index); - // int numbytes; - // if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0, - // thread_res->ai_addr, thread_res->ai_addrlen)) == -1) { - // perror("talker: sendto"); - // return (NULL); - // } - // print the size of the file_buffer - // printf("size of file_buffer: %lu\n", sizeof(file_buffer)); - - int bytes_sent = sizeof(file_buffer); - if (send_all_udp(udp_sockfd, file_buffer, &bytes_sent, thread_res) == -1) + + if (send_all_udp(udp_sockfd, file_buffer, &bytes_read, thread_res) == -1) { perror("send_all_udp"); - printf("We only sent %d bytes because of the error!\n", bytes_sent); + printf("We only sent %d bytes because of the error!\n", bytes_read); } - // printf("We sent all %d bytes!\n", bytes_sent); did_work = 1; @@ -395,17 +363,12 @@ void *send_announce_routine(void *arg) { // send the announce messages for (int i = 0; i < max_active_users; i++) { - // if (user_data[i].streamThread == NULL) { - // break; - // } if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) { continue; } - // print_user_data(i); // update the station of each user if (user_data[i].stationNum == station_num) { - // printf("sending announce to user %d\n", i); send_announce_reply(user_data[i].sockfd, station_num); } } @@ -416,25 +379,16 @@ void *synchronization_thread(void *arg) { while (1) { start_threads = 1; - // printf("\nbroadcast %d\n", c++); pthread_cond_broadcast(&cond); usleep(2000); + start_threads = 0; - // printf("before loop"); - // update file seek index for each station size_t BYTES_PER_SECOND = 16*1024; - // print num_stations - // printf("num_stations: %d\n", num_stations); + for (int i = 0; i < num_stations; i++) { - // printf("checking station %d\n", i); // get size of file struct stat f_info; - // int file_fd = open(station_data[i].filePath, O_RDONLY); - // if (file_fd == -1) { - // perror("open"); - // return (NULL); - // } if (stat(station_data[i].filePath, &f_info) == -1) { perror("fstat"); return (NULL); @@ -442,17 +396,11 @@ void *synchronization_thread(void *arg) { size_t size = f_info.st_size; - // fclose(file_fd); - // if (size == -1) { - // perror("ftell"); - // return (NULL); - // } station_data[i].seekIndex += BYTES_PER_SECOND; // if the seek index is greater than the size of the file, reset it if (station_data[i].seekIndex >= size) { station_data[i].seekIndex = 0; - // printf("resetting station %d\n", i); pthread_t send_announce_thread; pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)i); @@ -588,14 +536,6 @@ void *select_thread(void *arg) { continue; } - // printf("selectserver: new connection from %s on " - // "socket %d\n.", - // inet_ntop(remoteaddr.ss_family, - // get_in_addr((struct sockaddr*)&remoteaddr), - // remoteIP, INET6_ADDRSTRLEN), - // newfd); - // init user with this newfd - // get the udp port uint16_t udp_port = -1; int bytes_to_read = sizeof(uint16_t); @@ -607,16 +547,6 @@ void *select_thread(void *arg) { } udp_port = ntohs(udp_port); - // printf("udpPort (from Hello) for new connection is %d.\n", udp_port); - // update udp port of user - - // add a timeout to the socket - // struct timeval tv; - // tv.tv_sec = 0; - // tv.tv_usec = 0; - // if (setsockopt(newfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { - // perror("setsockopt"); - // } FD_SET(newfd, &master); // add to master set if (newfd > fdmax) { // keep track of the max fdmax = newfd; @@ -740,19 +670,7 @@ void *select_thread(void *arg) { void *init_user(int sockfd) { // add the user to the list of user data pthread_mutex_lock(&mutex_user_data); - // this is to save memory space. - // in general, the displacement of 4 is where a user "used to be" - // int user_index = max_active_users++; - // int running_index = 0; - // while(running_index < max_active_users) - // { - // if (user_data[running_index++].sockfd == -1) - // { - // user_index = running_index; - // break; - // } - // // printf("reusing memory\n"); - // } + if(sockfd > max_sockfd) { max_sockfd = sockfd; int * more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1)); @@ -760,7 +678,6 @@ void *init_user(int sockfd) { sockfd_to_user = more_sockfd_to_user; } - int running_index = 0; while(running_index < max_active_users) { if (user_data[running_index].sockfd == -1) { @@ -806,18 +723,14 @@ void *print_user_data(int index) { void destroy_user(int sockfd) { pthread_mutex_lock(&mutex_user_data); // stop the thread streaming to the user - // TODO: close the FD in the stream thread pthread_t thread = user_data[sockfd_to_user[sockfd]].streamThread; if (thread != -1) { pthread_cancel(thread); } - // close(user_data[sockfd_to_user[sockfd]].udpPort); - // pthread_kill(user_data[sockfd_to_user[sockfd]].streamThread, SIGINT); // "remove" the user from the list of user data user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1}; // map sockfd to -1 sockfd_to_user[sockfd] = -1; - // close(sockfd); pthread_mutex_unlock(&mutex_user_data); } @@ -846,13 +759,9 @@ void send_announce_reply(int fd, int station_num) { memcpy(send_buffer + 2, file_path, len_file_path); - // printf("buffer: %s\n", send_buffer); - size_t bytes_to_send = len_file_path + 2; if (send_all(fd, send_buffer, &bytes_to_send) == -1) perror("send_all"); - // print the number of bytes sent - // printf("sent %d bytes\n", bytes_to_send); free(send_buffer); } @@ -870,13 +779,9 @@ void send_invalid_command_reply(int fd, size_t message_size, char* message) { memcpy(send_buffer + 2, message, message_size); - // printf("buffer: %s\n", send_buffer); - int bytes_to_send = message_size + 2; if (send_all(fd, send_buffer, &bytes_to_send) == -1) perror("send"); - // print the number of bytes sent - // printf("sent %d bytes\n", bytessent); free(send_buffer); } @@ -894,46 +799,4 @@ int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { } return 1; -} - -// int send_all(int s, char *buf, int *len) -// { -// int total = 0; // how many bytes we've sent -// int bytesleft = *len; // how many we have left to send -// int n; - -// while(total < *len) { -// n = send(s, buf+total, bytesleft, 0); -// if (n == -1) { break; } -// total += n; -// bytesleft -= n; -// } - -// *len = total; // return number actually sent here - -// return n==-1?-1:0; // return -1 on failure, 0 on success -// } - -// int recv_all(int sock, char *buffer, int total_size) -// { -// int total_bytes_read = 0; -// int to_read = total_size; - -// char *ptr = buffer; - -// while (to_read > 0) { -// int bytes_read = recv(sock, ptr, to_read, 0); -// if (bytes_read <= 0) { -// if (bytes_read != 0) { -// perror("recv"); -// } -// return -1; -// } - -// to_read -= bytes_read; -// ptr += bytes_read; -// total_bytes_read += bytes_read; -// } - -// return total_bytes_read; -// } +}
\ No newline at end of file |