diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-23 17:30:45 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-23 17:30:45 -0400 |
commit | 3b2aa8c271bf5cd5497decb6577afe5fd7339f57 (patch) | |
tree | bc1d39ad76b15f58ddf61385645fa87a59fb1157 /server.c | |
parent | b417bcc57b9fd49f360087c32c97293a6bc7d2be (diff) | |
parent | 1e9ac5407ef4f2cddc745f35f33a860446526cea (diff) |
merge post-warmup with main
Diffstat (limited to 'server.c')
-rw-r--r-- | server.c | 955 |
1 files changed, 855 insertions, 100 deletions
@@ -1,154 +1,909 @@ -/* -** server.c -- a stream socket server demo -*/ - -#include <stdio.h> #include <stdlib.h> -#include <stdint.h> +#include <pthread.h> +#include <stdio.h> #include <unistd.h> -#include <errno.h> -#include <string.h> -#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> -#include <netdb.h> #include <arpa/inet.h> -#include <sys/wait.h> -#include <signal.h> +#include <netdb.h> +#include <string.h> + +#include <sys/stat.h> +#include <fcntl.h> +#include <sys/types.h> + +#include "protocol.c" + +#define LINE_MAX 1024 +#define MAX_USERS 1000 +#define MAX_PATH 50 +#define MAX_RATE_PER_SECOND 16*1024 / 2 + +// typedef struct station { +// int streamFd; +// char* filePath; +// int fileBufferSize; +// char fileBuffer[MAX_STREAM_RATE]; +// } station_t; +typedef struct station { + pthread_t streamThread; + int readfd; + char *filePath; +} station_t; +int num_stations; +station_t *stations; +int setup_stations(int argc, char *argv[]); +void *stream_routine(void *arg); + + +typedef struct user { + int udpPort; + int stationNum; + int sockfd; +} user_t; + + +/* For safe condition variable usage, must use a boolean predicate and */ +/* a mutex with the condition. */ +int count = 0; +pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER; + +const char *port; +// int num_stations; + +int start_threads = 0; +int max_active_users = 0; +int max_sockfd = 0; + +pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; +// array from index to user_data +user_t *user_data; +int *sockfd_to_user; + +// stations array pointer +// station_t *station_data; + +struct udp_packet_routine_args { + int user_index; + int buffer_size; + char *file_buffer; +}; + +void *send_udp_packet_routine(void* arg); +void *select_routine(void* arg); +void *sync_routine(void* arg); +void *send_announce_routine(void* arg); + +void init_station(int station_num, const char *station_name); + +int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]); +void *print_info_routine(void *arg); -#include "protocol.h" +void *get_in_addr(struct sockaddr *sa); -#define BACKLOG 10 // how many pending connections queue will hold +void *init_user(int sockfd); +void *update_user_udpPort(int sockfd, int udpPort); +void *update_user_station(int sockfd, int stationNum); +void *print_user_data(int sockfd); +void destroy_user(int sockfd); -#define MAXDATASIZE 100 // max number of bytes we can get at once +void send_announce_reply(int fd, int station_num); +void send_invalid_command_reply(int fd, size_t message_size, char* message); +// void *load_file(void* arg); -void sigchld_handler(int s) +main(int argc, char *argv[]) { - // waitpid() might overwrite errno, so we save and restore it: - int saved_errno = errno; + // check and assign arguments + if (argc < 3) { + fprintf(stderr,"usage: ./snowcast_server <listen port> <file0> [file 1] [file 2] ... \n"); + exit(1); + } + + // initizlize the port + port = argv[1]; + + // initialize the stations & their threads + if (setup_stations(argc, argv) == -1) { + perror("setup_stations"); + exit(1); + } + + // make array of user data + printf("max active users: %d\n", sizeof(user_t) * max_active_users); + user_data = malloc(sizeof(user_t) * max_active_users); + if (!user_data) { perror("malloc userdata"); return 1; } + sockfd_to_user = malloc(sizeof(int) * max_active_users); + if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; } + + // make and start "select" thread that manages: + // 1) new connections, 2) requests from current connections, 3) closing connections + pthread_t select_thread; + pthread_create(&select_thread, NULL, select_routine, NULL); + + // start syncchronization thread to broadcast stations + // pthread_t sync_thread; + // pthread_create(&sync_thread, NULL, sync_routine, NULL); + + // command line interface + char input[LINE_MAX]; + memset(input, 0, LINE_MAX); + + char *tokens[LINE_MAX / 2]; + while (read(STDIN_FILENO, input, LINE_MAX) > 0) { + // init tokens + memset(tokens, 0, (LINE_MAX / 2) * sizeof(char *)); + + // if 0, all whitespace + if (!parse(input, tokens)) + continue; - while(waitpid(-1, NULL, WNOHANG) > 0); + char *command = tokens[0]; + // if q, shutdown! + if (!strcmp(command, "q")) { + printf("Exiting.\n"); + // TODO: exit better than break + break; + } + + // if p, print info + else if (!strcmp(command, "p")) { + // get the file descriptor + int print_fd = 0; + // see if there is a file path + char *output_file_path = tokens[1]; + if (output_file_path != NULL) + { + if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1) + { + perror("open"); + continue; + } + } else { + print_fd = STDOUT_FILENO; + } + // printf("print_fd: %d\n", print_fd); + pthread_t print_info_thread; + pthread_create(&print_info_thread, NULL, print_info_routine, print_fd); + // note - this file descriptor is closed in the thread + } + else if (!strcmp(command, "u")) + { + // print all user data + for (int i = 0; i < max_active_users; i++) + { + print_user_data(i); + } + } + } - errno = saved_errno; + return 0; } +int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) { + int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file)"); return -1; } + // printf("bytes read: %d\n", bytes_read); + if (bytes_read == 0) { + // printf("end of file, restarting\n"); + pthread_t send_announce_thread; + pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num); -// get sockaddr, IPv4 or IPv6: -void *get_in_addr(struct sockaddr *sa) -{ - if (sa->sa_family == AF_INET) { - return &(((struct sockaddr_in*)sa)->sin_addr); + if (lseek(fd, 0, SEEK_SET) == -1) + { + perror("lseek (in resarting file)"); + return -1; + } + bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; } } - return &(((struct sockaddr_in6*)sa)->sin6_addr); + return bytes_read; +} + +void *stream_routine_cleanup(void *arg) { + int read_fd = (int) arg; + close(read_fd); +} + +void *stream_routine(void *arg) { + int station_num = (int) arg; + printf("stream routine %d\n", station_num); + int read_fd = stations[station_num].readfd; + + pthread_cleanup_push(stream_routine_cleanup, read_fd); + + // make buffer which will be used to stream to children + char buffer[MAX_RATE_PER_SECOND]; + memset(buffer, 0, MAX_RATE_PER_SECOND); + // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } + + for (;;) + { + // load bytes into buffer + int bytes_read = read_file(read_fd, buffer, station_num); + if (bytes_read == -1) { exit(1); } + + // TODO: send buffer to children + char *send_buffer = malloc(2 + bytes_read); + for (int i = 0; i < max_active_users; i++) + { + if (!user_data[i].sockfd || user_data[i].sockfd == -1) + continue; + if (user_data[i].stationNum == station_num) + { + // send the udp packet + int *send_buffer = malloc(2 + bytes_read); + memset(send_buffer, 0, 2 + bytes_read); + send_buffer[0] = i; + send_buffer[1] = bytes_read; + memcpy(send_buffer+2, buffer, bytes_read); + // printf("sending udp packet to user %d\n", i); + pthread_t t; + pthread_create(&t, NULL, send_udp_packet_routine, send_buffer); + } + } + free(send_buffer); + usleep(1000000 / 2 - 5000); + start_threads = 1; + pthread_cond_broadcast(&cond); + + usleep(5000); + start_threads = 0; + + memset(buffer, 0, MAX_RATE_PER_SECOND); + } + + return (NULL); + + pthread_cleanup_pop(1); } -int main(int argc, char *argv[]) +int setup_stations(int argc, char *argv[]) { + num_stations = argc - 2; + + // get the size to malloc + int totalSize = 0; + for(int i = 2; i < argc; i++) + { + totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); + } + + // malloc the stations array + stations = malloc(totalSize); + if (!stations) { perror("malloc (stations pointer)"); return -1; } + // assign the stations, and start the threads + for (int i = 0; i < num_stations; i++) { + stations[i].filePath = argv[i+2]; + stations[i].readfd = open(argv[i+2], O_RDONLY); + if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; } + pthread_create(&stations[i].streamThread, NULL, stream_routine, i); + } + + printf("successfully created %d stations\n", num_stations); + return 1; +} + +void write_int_to_fd(int fd, int n) { + int l = snprintf(NULL, 0, "%d", n); + char *num = malloc(l + 1); + if (!num) { perror("malloc write to fd"); return; } + + snprintf(num, l + 1, "%d", n); + if (write(fd, num, strlen(num)) == -1) { + perror("write"); + } + + + free(num); +} + +void *print_info_routine(void *arg) { + int print_fd = (int) arg; + // printf("thread print_fd: %d\n", print_fd); + // printf("num_stations: %d\n", num_stations); + for (int i = 0; i < num_stations; i++) { + write_int_to_fd(print_fd, i); + char *comma = ","; + write(print_fd, comma, strlen(comma)); + + // write file path + char* file_path = stations[i].filePath; + write(print_fd, file_path, strlen(file_path)); + + for (int j = 0; j < max_active_users; j++) { + if (!user_data[j].sockfd || user_data[j].sockfd == -1) + continue; + if (user_data[j].stationNum == i) { + char *localhost_ip = ",127.0.0.1:"; + write(print_fd, localhost_ip, strlen(localhost_ip)); + // write udpPort + write_int_to_fd(print_fd, user_data[j].udpPort); + } + } + // wrtie new line + char *newline = "\n"; + write(print_fd, newline, strlen(newline)); + } + + if (print_fd != STDOUT_FILENO) close(print_fd); + return (NULL); +} + +int send_all_udp(int udp_sockfd, char *buf, int *len, struct addrinfo *thread_res) { - int sockfd, new_fd, numbytes, b; // listen on sock_fd, new connection on new_fd - char buf[MAXDATASIZE]; - struct addrinfo hints, *servinfo, *p; - struct sockaddr_storage their_addr; // connector's address information - socklen_t sin_size; - struct sigaction sa; - int yes=1; - char s[INET6_ADDRSTRLEN]; - int rv; + int MAX_PACKET_SIZE = 512; + int total = 0; // how many bytes we've sent + int bytesleft = *len; // how many we have left to send + int n; - if (argc < 3) { - fprintf(stderr,"usage: <listen port> <file0> [file 1] [file 2] ... \n"); - exit(1); + while(total < *len) { + n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, thread_res->ai_addr, thread_res->ai_addrlen); + // thread_res->ai_addr, thread_res->ai_addrlen)) == -1; + if (n == -1) { break; } + total += n; + bytesleft -= n; } - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET; // only IPv4 - hints.ai_socktype = SOCK_STREAM; // TCP connection - hints.ai_flags = AI_PASSIVE; // use my IP + *len = total; // return number actually sent here + + return n==-1?-1:0; // return -1 on failure, 0 on success +} + +void udp_port_cleanup_handler(void *arg) +{ + int sockfd = (int) arg; + close(sockfd); +} + +/* Make the manager routine */ +void *send_udp_packet_routine(void *arg) { + // printf("send udp packet routine\n"); + int *buf = arg; + // unpack args + int user_index = buf[0]; + int buffer_size = buf[1]; + char *file_buffer = malloc(buffer_size); + memcpy(file_buffer, buf+2, buffer_size); + + // printf("udp packet routine, user:%d\n size: %d\n", user_index, buffer_size); + + // declare vairables to be used + int did_work = 1; + pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + int s; + int udp_sockfd; + struct addrinfo thread_hints, *thread_res, *thread_servinfo; + int error_code; + + // setup hints + memset(&thread_hints, 0, sizeof thread_hints); + thread_hints.ai_family = AF_INET; // use IPv4 only + thread_hints.ai_socktype = SOCK_DGRAM; + thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me + + int int_port = user_data[user_index].udpPort; + int length = snprintf( NULL, 0, "%d", int_port ); + char* port = malloc( length + 1 ); + if (!port) { perror("malloc on port"); return (NULL); } + snprintf(port, length + 1, "%d", int_port); + sprintf(port, "%d", int_port); + + if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0) + { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); + return (NULL); + } + free(port); + + // loop through all the results and make a socket + for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) { + if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype, + thread_res->ai_protocol)) == -1) { + perror("talker: socket"); + continue; + } + break; + } + if (udp_sockfd == NULL) { + fprintf(stderr, "talker: failed to create socket\n"); + return (NULL); + } - const char* port = argv[1]; // the port users will be connecting to + pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd); + // wait for + pthread_mutex_lock(&m); + did_work = 0; + while (!start_threads) + { + pthread_cond_wait(&cond, &m); + } - if ((rv = getaddrinfo(NULL, port, &hints, &servinfo)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); - return 1; + int station_num = user_data[user_index].stationNum; + if (station_num == -1) { + did_work = 1; } + // potential error here! + // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); + + if (send_all_udp(udp_sockfd, file_buffer, &buffer_size, thread_res) == -1) + { + perror("send_all_udp"); + printf("We only sent %d bytes because of the error!\n", buffer_size); + } + + free(file_buffer); + + pthread_mutex_unlock(&m); + + pthread_cleanup_pop(1); + + return (NULL); +} - // loop through all the results and bind to the first we can - for(p = servinfo; p != NULL; p = p->ai_next) { - if ((sockfd = socket(p->ai_family, p->ai_socktype, - p->ai_protocol)) == -1) { - perror("server: socket"); +void *send_announce_routine(void *arg) { + // unpack args + int station_num = (int) arg; + // send the announce messages + for (int i = 0; i < max_active_users; i++) + { + if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) { continue; } + // update the station of each user + if (user_data[i].stationNum == station_num) + { + send_announce_reply(user_data[i].sockfd, station_num); + } + } +} + +// void *sync_routine(void *arg) { +// int c = 0; +// while (1) +// { +// start_threads = 1; +// pthread_cond_broadcast(&cond); +// usleep(2000); + +// start_threads = 0; + +// usleep(1000000-2000); +// } +// } + +void *select_routine(void *arg) { + fd_set master; // master file descriptor list + fd_set read_fds; // temp file descriptor list for select() + int fdmax; // maximum file descriptor number + + int listener; // listening socket descriptor + int newfd; // newly accept()ed socket descriptor + struct sockaddr_storage remoteaddr; // client address + socklen_t addrlen; + + char buf[256]; // buffer for client data + int nbytes; + + + char remoteIP[INET6_ADDRSTRLEN]; + + int yes=1; // for setsockopt() SO_REUSEADDR, below + int i, j, rv; - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &yes, - sizeof(int)) == -1) { - perror("setsockopt"); - exit(1); + struct addrinfo hints, *ai, *p; + + // const char* port = argv[1]; + + FD_ZERO(&master); // clear the master and temp sets + FD_ZERO(&read_fds); + + // LISTENER: get us a socket and bind it + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { + fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); + exit(1); + } + + for(p = ai; p != NULL; p = p->ai_next) { + listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (listener < 0) { + continue; } + + // lose the pesky "address already in use" error message + setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); - if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { - close(sockfd); - perror("server: bind"); + if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { + close(listener); continue; } break; } - freeaddrinfo(servinfo); // all done with this structure - - if (p == NULL) { - fprintf(stderr, "server: failed to bind\n"); - exit(1); + // if we got here, it means we didn't get bound + if (p == NULL) { + fprintf(stderr, "snowcast_server: failed to bind\n"); + exit(2); } - if (listen(sockfd, BACKLOG) == -1) { + freeaddrinfo(ai); // all done with this + + // listen + if (listen(listener, 10) == -1) { perror("listen"); - exit(1); + exit(3); } - sa.sa_handler = sigchld_handler; // reap all dead processes - sigemptyset(&sa.sa_mask); - sa.sa_flags = SA_RESTART; - if (sigaction(SIGCHLD, &sa, NULL) == -1) { - perror("sigaction"); - exit(1); - } + // add the listener to the master set + FD_SET(listener, &master); - printf("server: waiting for connections...\n"); + // keep track of the biggest file descriptor + fdmax = listener; // so far, it's this one - while(1) { // main accept() loop - sin_size = sizeof their_addr; - new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); - if (new_fd == -1) { - perror("accept"); - continue; + while(1) { + read_fds = master; // copy it + if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { + perror("select"); + exit(4); } - inet_ntop(their_addr.ss_family, - get_in_addr((struct sockaddr *)&their_addr), - s, sizeof s); - printf("server: got connection from %s\n", s); + // run through the existing connections looking for data to read + for(i = 0; i <= fdmax; i++) { + if (FD_ISSET(i, &read_fds)) { // we got one!! + if (i == listener) { + // handle new connections + addrlen = sizeof remoteaddr; + newfd = accept(listener, + (struct sockaddr *)&remoteaddr, + &addrlen); - // make a struct for the message, number is the number of stations - struct Hello hello; - if ((recv(new_fd, &hello, sizeof(struct Hello), 0)) == -1) - { - perror("send"); - close(new_fd); - exit(0); + if (newfd == -1) { + perror("accept"); + } else { + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = TIMEOUT; + if (setsockopt(newfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("setsockopt"); + } + + uint8_t command_type = -1; + if ((nbytes = recv(newfd, &command_type, 1, 0)) <= 0) + { + // got error or connection closed by client + if (nbytes == 0) { + // connection closed + // printf("selectserver: socket %d hung up\n", i); + } else { + perror("recv"); + } + // remove user from data structures + close(newfd); + // destroy_user(i); + continue; + } + + if (command_type != 0) { + char * message = "must send a Hello message first"; + send_invalid_command_reply(newfd, strlen(message), message); + close(newfd); + continue; + } + + // get the udp port + uint16_t udp_port = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(newfd, &udp_port, &bytes_to_read) == -1) { + perror("recv_all"); + close(newfd); + // destroy_user(i); + continue; + } + udp_port = ntohs(udp_port); + + FD_SET(newfd, &master); // add to master set + if (newfd > fdmax) { // keep track of the max + fdmax = newfd; + } + init_user(newfd); + update_user_udpPort(newfd, udp_port); + + // send the welcome message to client + struct Welcome welcome; + welcome.replyType = 2; + welcome.numStations = htons(num_stations); + int bytes_to_send = sizeof(struct Welcome); + if (send_all(newfd, &welcome, &bytes_to_send) == -1) + perror("send_all"); + } + } else { + // handle data from a client + uint8_t command_type = -1; + if ((nbytes = recv(i, &command_type, 1, 0)) <= 0) + { + // got error or connection closed by client + if (nbytes == 0) { + // connection closed + // printf("selectserver: socket %d hung up\n", i); + } else { + perror("recv"); + } + close(i); // bye! + FD_CLR(i, &master); // remove from master set + // remove user from data structures + destroy_user(i); + } + else + { + // we got some data from a client + if (command_type == 0) { // we got a Hello commmand + + // get the udp port + uint16_t udp_port = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(i, &udp_port, &bytes_to_read) == -1) { + perror("recv_all"); + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + + if (user_data[sockfd_to_user[i]].udpPort != -1) { + // send back in invalid command + char * message = "must not sent more than one Hello message"; + send_invalid_command_reply(i, strlen(message), message); + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + } + else if (command_type == 1) { // we got a SetStation command + // get the station number + + uint16_t station_number = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(i, &station_number, &bytes_to_read) == -1) { + perror("recv_all"); + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + station_number = ntohs(station_number); + + // check if user has a udpPort to stream to + if (user_data[sockfd_to_user[i]].udpPort == -1) { + // send back in invalid command + char * message = "must send Hello message first"; + send_invalid_command_reply(i, strlen(message), message); + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + + // check if station num is in range + if (station_number >= num_stations || station_number < 0) { + // send back in invalid command + char * message = "station number out of range"; + send_invalid_command_reply(i, strlen(message), message); + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + continue; + } + + // printf("setting station to %d\n", ntohs(command.number)); + // update station of user + update_user_station(i, station_number); + send_announce_reply(i, station_number); + } + else { + // send back in invalid command + char * message = "invalid command"; + send_invalid_command_reply(i, strlen(message), message); + + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + destroy_user(i); + } + } + } // END handle data from client + } // END got new incoming connection + } // END looping through file descriptors + + } // END for(;;)--and you thought it would never end! +} + +void *init_user(int sockfd) { + // add the user to the list of user data + pthread_mutex_lock(&mutex_user_data); + + if(sockfd > max_sockfd) { + max_sockfd = sockfd; + int * more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1)); + if (!more_sockfd_to_user) { perror("realloc"); exit(1); } + sockfd_to_user = more_sockfd_to_user; + } + + int running_index = 0; + while(running_index < max_active_users) { + if (user_data[running_index].sockfd == -1) { + break; } + running_index++; + } + if (running_index == max_active_users) { + // printf("reached max active users\n"); + // printf("making new memory\n"); + max_active_users++; + user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); + if (!more_users) { perror("realloc"); exit(1); } + user_data = more_users; + } + + // map TCP sockfd to this user index + user_data[running_index] = (user_t){-1, -1, sockfd}; + sockfd_to_user[sockfd] = running_index; + // free(user_stream_threads); + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_udpPort(int sockfd, int udpPort) { + pthread_mutex_lock(&mutex_user_data); + // get the user + user_t *user = &user_data[sockfd_to_user[sockfd]]; + // set the udpPort + user->udpPort = udpPort; + // start the stream thread, now that we have the udpPort + // pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_station(int sockfd, int stationNum) { + pthread_mutex_lock(&mutex_user_data); + user_data[sockfd_to_user[sockfd]].stationNum = stationNum; + pthread_mutex_unlock(&mutex_user_data); +} +void *print_user_data(int index) { + printf("udpPort: %d, stationNum: %d, sockfd: %d\n", + user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd); +} + +void destroy_user(int sockfd) { + pthread_mutex_lock(&mutex_user_data); + // stop the thread streaming to the user + // pthread_t thread = user_data[sockfd_to_user[sockfd]].streamThread; + // if (thread != -1) { + // pthread_cancel(thread); + // } + // "remove" the user from the list of user data + user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1}; + // map sockfd to -1 + sockfd_to_user[sockfd] = -1; - // make a struct for the message, number is the number of stations - struct Welcome welcome; - welcome.replyType = 2; - welcome.numStations = htons(argc - 2); - if ((send(new_fd, &welcome, sizeof(struct Welcome), 0)) == -1) - perror("send"); - close(new_fd); - exit(0); + pthread_mutex_unlock(&mutex_user_data); +} + + +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); } - return 0; + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + +void send_announce_reply(int fd, int station_num) { + char* file_path = stations[station_num].filePath; + int len_file_path = strlen(file_path); + + char *send_buffer = malloc(len_file_path+2); + if (!send_buffer) { + perror("malloc in send announce"); + return; + } + send_buffer[0] = 3; + send_buffer[1] = len_file_path; + + memcpy(send_buffer + 2, file_path, len_file_path); + + size_t bytes_to_send = len_file_path + 2; + if (send_all(fd, send_buffer, &bytes_to_send) == -1) + perror("send_all"); + + free(send_buffer); +} + +void send_invalid_command_reply(int fd, size_t message_size, char* message) { + char *send_buffer = malloc(message_size+2); + if (!send_buffer) { + perror("malloc in send invalid command"); + return; + } + + // type and payload size + send_buffer[0] = 4; + send_buffer[1] = message_size; + + memcpy(send_buffer + 2, message, message_size); + + int bytes_to_send = message_size + 2; + if (send_all(fd, send_buffer, &bytes_to_send) == -1) + perror("send"); + + free(send_buffer); +} + +// void init_station(int station_num, const char* station_name) { +// station_t *station = &station_data[station_num]; + +// // open the file +// int stream_fd = open(station_name, O_RDONLY); +// if (stream_fd == -1) { +// perror("open"); +// return; +// } +// station->streamFd = stream_fd; +// station->filePath = station_name; + +// // setup file buffer +// char stream_buffer[MAX_STREAM_RATE]; +// memset(stream_buffer, 0, MAX_STREAM_RATE); + +// station->fileBufferSize = MAX_STREAM_RATE; +// memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE); + + +// // load the first buffer into the stations +// seek_stations(station_num); +// } + +// void seek_stations(int station_num) { +// station_t *station_info = &station_data[station_num]; +// memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE); +// int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); +// lseek(station_info->streamFd, -16, SEEK_SET); +// // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); + +// // time to restart the file +// if (bytes_read == 0) { +// if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) { +// perror("fseek"); +// } +// pthread_t send_announce_thread; +// pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num); + +// // load first chunk +// bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); +// } +// station_info->fileBufferSize = bytes_read; +// } + + +// Parses a buffer into tokens, from cs33 :) +int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { + const char *regex = " \n\t\f\r"; + char *current_token = strtok(buffer, regex); + if (current_token == NULL) return 0; + + for (int i = 0; current_token != NULL; i++) { + tokens[i] = current_token; + current_token = strtok(NULL, regex); + } + + return 1; }
\ No newline at end of file |