From 182df1bda7f8affc3003009c20375e590009838f Mon Sep 17 00:00:00 2001 From: sotech117 Date: Wed, 20 Sep 2023 14:10:45 -0400 Subject: fix memory array --- snowcast_server_concurrent.c | 87 +++++++++++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 30 deletions(-) (limited to 'snowcast_server_concurrent.c') diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c index d853507..77072fe 100644 --- a/snowcast_server_concurrent.c +++ b/snowcast_server_concurrent.c @@ -10,6 +10,8 @@ #include #include +#include + #include "protocol.h" @@ -206,7 +208,7 @@ void *print_info_routine(void *arg) { write(print_fd, file_path, strlen(file_path)); for (int j = 0; j < max_active_users; j++) { - if (user_data[j].sockfd == -1) + if (!user_data[j].sockfd || user_data[j].sockfd == -1) continue; if (user_data[j].stationNum == i) { char *localhost_ip = ",127.0.0.1:"; @@ -327,20 +329,24 @@ void *send_udp_packet_routine(void *arg) { // get file path char* file_path = station_data[station_num].filePath; // get current seek chunk - FILE* file_stream = fopen(file_path, "r"); + int stream_fd = open(file_path, O_RDONLY); + if (stream_fd == -1) { + perror("open"); + return (NULL); + } int current_chunk = station_data[station_num].seekIndex; - if (fseek(file_stream, current_chunk, SEEK_SET) == -1) { + if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) { perror("fseek"); return (NULL); } size_t BYTES_PER_SECOND = 16*1024; // read 1000 bytes of the file char file_buffer[BYTES_PER_SECOND]; - if (fread(file_buffer, BYTES_PER_SECOND, 1, file_stream) == -1) { + if (read(stream_fd, file_buffer, BYTES_PER_SECOND) == -1) { perror("fread"); return (NULL); } - fclose(file_stream); + close(stream_fd); // printf("send data: thread %d \n", user_index); // int numbytes; // if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0, @@ -381,14 +387,15 @@ void *send_announce_routine(void *arg) { // if (user_data[i].streamThread == NULL) { // break; // } - if (user_data[i].sockfd == -1) + if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) { continue; + } // print_user_data(i); // update the station of each user if (user_data[i].stationNum == station_num) { // printf("sending announce to user %d\n", i); - send_announce_reply(user_data[i].sockfd, i); + send_announce_reply(user_data[i].sockfd, station_num); } } } @@ -411,14 +418,24 @@ void *synchronization_thread(void *arg) { { // printf("checking station %d\n", i); // get size of file - FILE* fp = fopen(station_data[i].filePath, "r"); - fseek(fp, 0L, SEEK_END); - size_t size = ftell(fp); - fclose(fp); - if (size == -1) { - perror("ftell"); + struct stat f_info; + // int file_fd = open(station_data[i].filePath, O_RDONLY); + // if (file_fd == -1) { + // perror("open"); + // return (NULL); + // } + if (stat(station_data[i].filePath, &f_info) == -1) { + perror("fstat"); return (NULL); } + + size_t size = f_info.st_size; + + // fclose(file_fd); + // if (size == -1) { + // perror("ftell"); + // return (NULL); + // } station_data[i].seekIndex += BYTES_PER_SECOND; // if the seek index is greater than the size of the file, reset it if (station_data[i].seekIndex >= size) @@ -550,7 +567,7 @@ void *select_thread(void *arg) { // got error or connection closed by client if (nbytes == 0) { // connection closed - printf("selectserver: socket %d hung up\n", i); + // printf("selectserver: socket %d hung up\n", i); } else { perror("recv"); } @@ -620,39 +637,46 @@ void *select_thread(void *arg) { } // END got new incoming connection } // END looping through file descriptors - // broadcast the new files over the udp socket list for each use - } // END for(;;)--and you thought it would never end! } void *init_user(int sockfd) { // add the user to the list of user data pthread_mutex_lock(&mutex_user_data); - // this is to save memory space. // in general, the displacement of 4 is where a user "used to be" - int user_index = max_active_users++; + // int user_index = max_active_users++; + // int running_index = 0; + // while(running_index < max_active_users) + // { + // if (user_data[running_index++].sockfd == -1) + // { + // user_index = running_index; + // break; + // } + // // printf("reusing memory\n"); + // } int running_index = 0; - while(running_index++ < max_active_users) - { - if (user_data[running_index].sockfd == -1) - { - user_index = running_index; + while(running_index < max_active_users) { + if (user_data[running_index].sockfd == -1) { break; } - // printf("reusing memory\n"); + running_index++; } - // have to make more memory - if (user_index == max_active_users) { - ///printf("making new memory\n"); + if (running_index == max_active_users) { + // printf("reached max active users\n"); + printf("making new memory\n"); + max_active_users++; + // TODO: FIX SO THAT IT USES CURRENT USERS, NOT MAX_ACTIVE_USERS FOT THE RESIZE user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); if (!more_users) { perror("realloc"); exit(1); } user_data = more_users; } + // map TCP sockfd to this user index - user_data[user_index] = (user_t){-1, -1, sockfd, -1}; - sockfd_to_user[sockfd] = user_index; + user_data[running_index] = (user_t){-1, -1, sockfd, -1}; + sockfd_to_user[sockfd] = running_index; // free(user_stream_threads); pthread_mutex_unlock(&mutex_user_data); } @@ -675,16 +699,19 @@ void *print_user_data(int index) { printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n", user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread); } + void destroy_user(int sockfd) { pthread_mutex_lock(&mutex_user_data); - // stop the thread streaming to the user + // TODO: close the FD in the stream thread pthread_cancel(user_data[sockfd_to_user[sockfd]].streamThread); + // close(user_data[sockfd_to_user[sockfd]].udpPort); // pthread_kill(user_data[sockfd_to_user[sockfd]].streamThread, SIGINT); // "remove" the user from the list of user data user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1}; // map sockfd to -1 sockfd_to_user[sockfd] = -1; + // close(sockfd); pthread_mutex_unlock(&mutex_user_data); } -- cgit v1.2.3-70-g09d2