diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-18 23:58:47 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-18 23:58:47 -0400 |
commit | 7db333857219362ba14dec132825debc0d940a6c (patch) | |
tree | 084c7f16f0087b03de593396f5994ca8042b0acd /snowcast_server_concurrent.c | |
parent | 779ce6a6885346257b4d18f1efe205984cfff079 (diff) |
cleanup some bugs. you can now listen to the music! good stopping point
Diffstat (limited to 'snowcast_server_concurrent.c')
-rw-r--r-- | snowcast_server_concurrent.c | 121 |
1 files changed, 107 insertions, 14 deletions
diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c index 03d6414..3b71156 100644 --- a/snowcast_server_concurrent.c +++ b/snowcast_server_concurrent.c @@ -10,13 +10,12 @@ #include "protocol.h" -#define NUM_STATIONS 2 #define LINE_MAX 1024 #define MAX_USERS 1000 #define MAX_PATH 50 typedef struct station { - int currentChunk; + int seekIndex; char* filePath; } station_t; @@ -34,7 +33,10 @@ int count = 0; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER; + const char *port; +int num_stations; int start_threads = 0; int max_active_users = 0; @@ -75,6 +77,10 @@ int main(int argc, char *argv[]) } port = argv[1]; + num_stations = argc - 2; + + printf("port: %s\n", port); + printf("num_stations: %d\n", num_stations); // init stations size_t totalSize = 0; @@ -92,7 +98,7 @@ int main(int argc, char *argv[]) } // print all indexes in station data - for (int i = 0; i < NUM_STATIONS; i++) + for (int i = 0; i < num_stations; i++) { printf("station %d: %s\n", i, station_data[i].filePath); } @@ -132,6 +138,27 @@ int main(int argc, char *argv[]) return 0; } +int sendall(int udp_sockfd, char *buf, int *len, struct addrinfo *thread_res) +{ + int MAX_PACKET_SIZE = 512; + int total = 0; // how many bytes we've sent + int bytesleft = *len; // how many we have left to send + int n; + + while(total < *len) { + n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, thread_res->ai_addr, thread_res->ai_addrlen); + // thread_res->ai_addr, thread_res->ai_addrlen)) == -1; + if (n == -1) { break; } + total += n; + bytesleft -= n; + } + + *len = total; // return number actually sent here + + return n==-1?-1:0; // return -1 on failure, 0 on success +} + + /* Make the manager routine */ void *send_udp_packet_routine(void *arg) { // unpack args @@ -198,6 +225,7 @@ void *send_udp_packet_routine(void *arg) { { pthread_cond_wait(&cond, &m); } + int station_num = user_data[user_index].stationNum; if (station_num == -1) { did_work = 1; @@ -207,21 +235,52 @@ void *send_udp_packet_routine(void *arg) { // sendto a random string of data to the user int station_num = user_data[user_index].stationNum; char *data = station_data[station_num].filePath; - printf("load data: thread %d \n", user_index); - int numbytes; - if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0, - thread_res->ai_addr, thread_res->ai_addrlen)) == -1) { - perror("talker: sendto"); + // printf("load data: thread %d \n", user_index); + + // get file path + char* file_path = station_data[station_num].filePath; + // get current seek chunk + int current_chunk = station_data[station_num].seekIndex; + FILE* file_stream = fopen(file_path, "r"); + if (fseek(file_stream, current_chunk, SEEK_SET) == -1) { + perror("fseek"); return (NULL); } - printf("send data: thread %d \n", user_index); + size_t BYTES_PER_SECOND = 16*1024; + // read 1000 bytes of the file + char file_buffer[BYTES_PER_SECOND]; + if (fread(file_buffer, BYTES_PER_SECOND, 1, file_stream) == -1) { + perror("fread"); + return (NULL); + } + // printf("send data: thread %d \n", user_index); + // int numbytes; + // if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0, + // thread_res->ai_addr, thread_res->ai_addrlen)) == -1) { + // perror("talker: sendto"); + // return (NULL); + // } + // print the size of the file_buffer + // printf("size of file_buffer: %lu\n", sizeof(file_buffer)); + + int bytes_sent = sizeof(file_buffer); + if (sendall(udp_sockfd, file_buffer, &bytes_sent, thread_res) == -1) + { + perror("sendall"); + printf("We only sent %d bytes because of the error!\n", bytes_sent); + } + // printf("We sent all %d bytes!\n", bytes_sent); did_work = 1; + + close(file_stream); + + usleep(400000); } pthread_mutex_unlock(&m); - usleep(500000); + usleep(100000); } return NULL; } @@ -231,11 +290,39 @@ void *synchronization_thread(void *arg) { while (1) { start_threads = 1; - printf("\nbroadcast %d\n", c++); + // printf("\nbroadcast %d\n", c++); pthread_cond_broadcast(&cond); usleep(2000); start_threads = 0; - usleep(1000000-2000); + // printf("before loop"); + // update file seek index for each station + size_t BYTES_PER_SECOND = 16*1024; + // print num_stations + // printf("num_stations: %d\n", num_stations); + for (int i = 0; i < num_stations; i++) + { + // printf("checking station %d\n", i); + // get size of file + FILE* fp = fopen(station_data[i].filePath, "r"); + fseek(fp, 0L, SEEK_END); + size_t size = ftell(fp); + if (size == -1) { + perror("ftell"); + return (NULL); + } + station_data[i].seekIndex += BYTES_PER_SECOND; + // if the seek index is greater than the size of the file, reset it + if (station_data[i].seekIndex >= size) + { + // printf("resetting seek index for station %d\n", i); + station_data[i].seekIndex = 0; + } + fclose(fp); + } + + + usleep(2000); + usleep(1000000-4000); } } @@ -348,9 +435,15 @@ void *select_thread(void *arg) { // send the welcome message to client struct Welcome welcome; welcome.replyType = 2; - welcome.numStations = htons(NUM_STATIONS); - if ((send(newfd, &welcome, sizeof(struct Welcome), 0)) == -1) + welcome.numStations = htons(num_stations); + int numbytes; + if ((numbytes=send(newfd, &welcome, sizeof(struct Welcome), 0)) == -1) perror("send"); + + //print the num bytes + // print the size of the struct welcome + printf("size of welcome struct: %lu\n", sizeof(struct Welcome)); + printf("sent %d bytes\n", numbytes); } } else { // handle data from a client |