diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-18 02:34:12 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-18 02:34:12 -0400 |
commit | 6ed8bef65e54efdb98841ef9b6eb3ea3c9be82d5 (patch) | |
tree | 18569e996f7f2f61de2285ffa54d7854478e132d /snowcast_server_concurrent.c | |
parent | 9e0994d5a997a5b8c298992f05c94ba1b22a692c (diff) |
big progress on streaming in sync and controlling multiple users
Diffstat (limited to 'snowcast_server_concurrent.c')
-rw-r--r-- | snowcast_server_concurrent.c | 457 |
1 files changed, 457 insertions, 0 deletions
diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c new file mode 100644 index 0000000..7f70294 --- /dev/null +++ b/snowcast_server_concurrent.c @@ -0,0 +1,457 @@ +#include <stdlib.h> +#include <pthread.h> +#include <stdio.h> +#include <unistd.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <string.h> + +#include "protocol.h" + +typedef struct station { + char* filePath; + int currentChunk; +} station_t; + +typedef struct user { + int udpPort; + int stationNum; + int sockfd; +} user_t; + +#define NUM_STATIONS 2 +#define LINE_MAX 1024 +#define MAX_USERS 1000 + +/* For safe condition variable usage, must use a boolean predicate and */ +/* a mutex with the condition. */ +int count = 0; +pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + + +int start_threads = 0; + +int max_active_users = 0; + +pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; +// array from fd to users +user_t *user_data; +// array from fd to user's stream thread +pthread_t *user_stream_threads; +station_t station_data[NUM_STATIONS]; +int sockfd_to_user[MAX_USERS + 4]; + +char* port = "4950"; + +void *send_udp_packet_routine(void* arg); +void *select_thread(void* arg); +void *synchronization_thread(void* arg); + +void *get_in_addr(struct sockaddr *sa); + +void *init_user(int sockfd); +void *update_user_udpPort(int sockfd, int udpPort); +void *update_user_station(int sockfd, int stationNum); +void *print_user_data(int sockfd); +void *destroy_user(int sockfd); + + +// void *load_file(void* arg); + +main(int argc, char *argv[]) +{ + // temporary + station_data[0] = (station_t) {"mp3/Beethoven-SymphonyNo5.mp3", 0}; + station_data[1] = (station_t) {"mp3/DukeEllington-Caravan.mp3", 0}; + + // threads to control reading files at chunks while the other threads sleep + + user_data = malloc(sizeof(user_t) * max_active_users); + if (!user_data) { perror("malloc"); return 1; } + user_stream_threads = malloc(sizeof(pthread_t) * max_active_users); + if (!user_stream_threads) + { + perror("malloc"); return 1; + } + + // thread that manages file descriptors + pthread_t s_thread, sync_thread; + pthread_create(&s_thread, NULL, select_thread, NULL); + + // starts the threads after created + // sleep(1); + // startThreads = 0; + // pthread_cond_broadcast(&cond); + + // command line interface + char input[LINE_MAX]; + while (1) { + char *line = fgets(input, LINE_MAX, stdin); + + if (line == NULL) { + continue; + } else if (strncmp("q\n", input, LINE_MAX) == 0) { + // end code if type in q + printf("Exiting.\n"); + break; + } else if (strncmp("p\n", input, LINE_MAX) == 0) { + // print all user data + for (int i = 0; i < max_active_users; i++) { + print_user_data(i); + } + } else if (strncmp("s\n", input, LINE_MAX) == 0) { + // start the streaming threads + pthread_create(&sync_thread, NULL, synchronization_thread, NULL); + } + } + + return 0; +} + +/* Make the manager routine */ +void *send_udp_packet_routine(void *arg) { + // pthread_mutex_lock(&mutex); + // while(startThreads) { + // pthread_cond_wait(&cond, &mutex); + // } + // pthread_mutex_unlock(&mutex); + int did_send_data = 0; + int did_load_data = 0; + + int * i = (int *) arg; + while (1) + { + pthread_mutex_lock(&mutex); + if (!start_threads && did_send_data && did_load_data) { + did_load_data = 0; + did_send_data = 0; + } + while(!start_threads) { + pthread_cond_wait(&cond, &mutex); + } + pthread_mutex_unlock(&mutex); + if(!did_send_data && start_threads) { + printf("send data: thread %d \n", i); + did_send_data = 1; + } + if(!did_load_data && start_threads) { + printf("load data: thread %d \n", i); + did_load_data = 1; + } + } + return NULL; +} + +// /* Make the manager routine */ +// void *load_file(void *arg) { +// // read first data off the files +// pthread_mutex_lock(&mutex); +// while(startThreads) { +// pthread_cond_wait(&cond, &mutex); +// } +// pthread_mutex_unlock(&mutex); + +// int * i = (int *) arg; +// while (1) +// { +// pthread_mutex_lock(&mutex); +// while(startThreads) { +// pthread_cond_wait(&cond, &mutex); +// } +// pthread_mutex_unlock(&mutex); +// /* Do some work. */ +// printf("Thread %d \n", i); +// // send data to port +// // read data coming in off the file +// // sleep for a secong +// sleep(1); +// } +// return NULL; +// } + +void *synchronization_thread(void *arg) { + int c = 0; + while (1) + { + start_threads = 1; + printf("\nbroadcast %d\n", c++); + pthread_cond_broadcast(&cond); + usleep(10000); + start_threads = 0; + sleep(1); + } +} + +void *select_thread(void *arg) { + fd_set master; // master file descriptor list + fd_set read_fds; // temp file descriptor list for select() + int fdmax; // maximum file descriptor number + + int listener; // listening socket descriptor + int newfd; // newly accept()ed socket descriptor + struct sockaddr_storage remoteaddr; // client address + socklen_t addrlen; + + char buf[256]; // buffer for client data + int nbytes; + + + char remoteIP[INET6_ADDRSTRLEN]; + + int yes=1; // for setsockopt() SO_REUSEADDR, below + int i, j, rv; + + struct addrinfo hints, *ai, *p; + + // const char* port = argv[1]; + + FD_ZERO(&master); // clear the master and temp sets + FD_ZERO(&read_fds); + + // LISTENER: get us a socket and bind it + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { + fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); + exit(1); + } + + for(p = ai; p != NULL; p = p->ai_next) { + listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (listener < 0) { + continue; + } + + // lose the pesky "address already in use" error message + setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + + if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { + close(listener); + continue; + } + + break; + } + + // if we got here, it means we didn't get bound + if (p == NULL) { + fprintf(stderr, "snowcast_server: failed to bind\n"); + exit(2); + } + + freeaddrinfo(ai); // all done with this + + // listen + if (listen(listener, 10) == -1) { + perror("listen"); + exit(3); + } + + // add the listener to the master set + FD_SET(listener, &master); + + // keep track of the biggest file descriptor + fdmax = listener; // so far, it's this one + + while(1==1) { + read_fds = master; // copy it + if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { + perror("select"); + exit(4); + } + + // run through the existing connections looking for data to read + for(i = 0; i <= fdmax; i++) { + if (FD_ISSET(i, &read_fds)) { // we got one!! + if (i == listener) { + // handle new connections + addrlen = sizeof remoteaddr; + newfd = accept(listener, + (struct sockaddr *)&remoteaddr, + &addrlen); + + if (newfd == -1) { + perror("accept"); + } else { + FD_SET(newfd, &master); // add to master set + if (newfd > fdmax) { // keep track of the max + fdmax = newfd; + } + printf("selectserver: new connection from %s on " + "socket %d\n.", + inet_ntop(remoteaddr.ss_family, + get_in_addr((struct sockaddr*)&remoteaddr), + remoteIP, INET6_ADDRSTRLEN), + newfd); + + // init user with this newfd + init_user(newfd); + + // send the welcome message to client + struct Welcome welcome; + welcome.replyType = 2; + welcome.numStations = htons(NUM_STATIONS); + if ((send(newfd, &welcome, sizeof(struct Welcome), 0)) == -1) + perror("send"); + } + } else { + // handle data from a client + struct Command command; + if ((nbytes = recv(i, (char*)&command, sizeof(struct Command), 0)) <= 0) { + // got error or connection closed by client + if (nbytes == 0) { + // connection closed + printf("selectserver: socket %d hung up\n", i); + } else { + perror("recv"); + } + close(i); // bye! + FD_CLR(i, &master); // remove from master set + // remove user from data structures + destroy_user(i); + } else { + // we got some data from a client + if (command.commandType == 0) { + // hello message with udpPort + printf("udpPort (from Hello) for new connection is %d.\n", ntohs(command.number)); + // update udp port of user + update_user_udpPort(i, ntohs(command.number)); + + + + // // TALKER: get us a udp socket and bind it + // struct addrinfo hintsUdp, *servinfoUdp, *pUdp; + // int rvUdp, sockfdUdp, numbytesUdp; + // memset(&hintsUdp, 0, sizeof hintsUdp); + // hintsUdp.ai_family = AF_INET; // IPv4 + // hintsUdp.ai_socktype = SOCK_DGRAM; // UDP + // if ((rvUdp = getaddrinfo(argv[1], command.number, &hints, &servinfoUdp)) != 0) { + // fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rvUdp)); + // return 1; + // } + + // // loop through all the results and make a socket + // for(p = servinfoUdp; p != NULL; p = p->ai_next) { + // if ((sockfdUdp = socket(p->ai_family, p->ai_socktype, + // p->ai_protocol)) == -1) { + // perror("talker: socket"); + // continue; + // } + // break; + // } + + // if (p == NULL) { + // fprintf(stderr, "talker: failed to create socket\n"); + // return 2; + // } + + + // if ((numbytesUdp = sendto(sockfdUdp, "test", strlen("test"), 0, + // p->ai_addr, p->ai_addrlen)) == -1) { + // perror("talker: sendto"); + // exit(1); + // } + + // freeaddrinfo(servinfoUdp); + + // printf("talker: sent %d bytes to %d\n", numbytesUdp, sockfdUdp); + // close(sockfdUdp); + } + else if (command.commandType == 1) { + // setStation command for the user + printf("TODO: set station to %d\n", ntohs(command.number)); + // update station of user + update_user_station(i, ntohs(command.number)); + } + else { + // send back in invalid command + struct InvalidCommand invalid; + invalid.replyType = 4; + invalid.replyStringSize = 21; + // make a string with the command.commmandType type in it + invalid.replyString = "Invalid command type"; + if ((send(i, &invalid, sizeof(struct InvalidCommand), 0)) == -1) + perror("send"); + + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + } + } + } // END handle data from client + } // END got new incoming connection + } // END looping through file descriptors + + // broadcast the new files over the udp socket list for each use + + } // END for(;;)--and you thought it would never end! +} + +void *init_user(int sockfd) { + // add the user to the list of user data + pthread_mutex_lock(&mutex_user_data); + + // this is to save memory space. + // in general, the displacement of 4 is where a user "used to be" + int user_index = max_active_users++; + if(user_data[sockfd-4].sockfd == -1) { + printf("reusing memory\n"); + user_index = sockfd - 4; + } else { + printf("making new memory\n"); + // have to make more memory + user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); + if (!more_users) { perror("realloc"); exit(1); } + user_data = more_users; + pthread_t *more_stream_threads = realloc(user_stream_threads, sizeof(pthread_t) * max_active_users); + if (!more_stream_threads) { perror("realloc"); exit(1); } + user_stream_threads = more_stream_threads; + } + // map sockfd to this user index & create its stream thread + user_data[user_index] = (user_t) {-1, -1, sockfd}; + sockfd_to_user[sockfd] = user_index; + pthread_create(&user_stream_threads[user_index], NULL, send_udp_packet_routine, (void *)sockfd); + // free(user_stream_threads); + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_udpPort(int sockfd, int udpPort) { + pthread_mutex_lock(&mutex_user_data); + user_data[sockfd_to_user[sockfd]].udpPort = udpPort; + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_station(int sockfd, int stationNum) { + pthread_mutex_lock(&mutex_user_data); + user_data[sockfd_to_user[sockfd]].stationNum = stationNum; + pthread_mutex_unlock(&mutex_user_data); +} +void *print_user_data(int index) { + printf("udpPort: %d, stationNum: %d, sockfd: %d\n", + user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd); +} +void *destroy_user(int sockfd) { + pthread_mutex_lock(&mutex_user_data); + + // stop the thread streaming to the user + pthread_cancel(user_stream_threads[sockfd_to_user[sockfd]]); + // "remove" the user from the list of user data + user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1}; + // map sockfd to -1 + sockfd_to_user[sockfd] = -1; + + pthread_mutex_unlock(&mutex_user_data); +} + + +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + + return &(((struct sockaddr_in6*)sa)->sin6_addr); +}
\ No newline at end of file |