#include "../../../plugins/youtube/YoutubeMediaProxy.hpp" #include "../../../include/NetUtils.hpp" #include #include #include #include #include #include #include #include #include #include // TODO: What if the client sends a new header without reconnecting? is that even allowed by http standard? // TODO: Detect when download has finished (and close connection). namespace QuickMedia { static const int MAX_BUFFER_SIZE = 65536; static const int64_t RANGE = 524287; static const int64_t THROTTLED_DOWNLOAD_LIMIT_KB = 80; // TODO: What about people with really slow internet? What if the video player cache is not working and download is stuck, leading to false download speed calculation? static const int64_t THROTTLED_DURATION_SECS = 3; static const char download_error_response_msg[] = "HTTP/1.1 500 Internal Server Error\r\n" "Content-Length: 0\r\n\r\n"; static bool set_non_blocking(int fd) { const int flags = fcntl(fd, F_GETFL, 0); if(fd == -1 || fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) return false; return true; } // TODO: Restrict range end to remote file size (content-length which we have saved). // TODO: Check if custom youtube redirect code is needed bool YoutubeMediaProxy::start_download(const std::string &media_url, ReadProgram &read_program, int64_t range_start, bool include_header, bool is_livestream, int livestream_sequence) { std::string r = std::to_string(range_start) + "-" + std::to_string(range_start + RANGE); std::string url = media_url + "&rn=" + std::to_string(rn) + "&rbuf=" + std::to_string(rbuf); std::vector args = { "curl", //"-H", "Accept-Language: en-US,en;q=0.5", "-H", "Connection: keep-alive", "--no-buffer", "-H", "user-agent: Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36", "-g", "-s", "-L", "-f" }; if(is_livestream) { if(livestream_sequence != -1) url += "&sq=" + std::to_string(livestream_sequence); } else { args.insert(args.end(), { "-r", r.c_str() }); } if(include_header) args.push_back("-i"); //fprintf(stderr, "url: %s\n", url.c_str()); args.insert(args.end(), { "--", url.c_str(), nullptr }); if(exec_program_pipe(args.data(), &read_program) != 0) return false; if(!set_non_blocking(read_program.read_fd)) { perror("start_download: failed to set curl pipe non blocking mode"); close(read_program.read_fd); kill(read_program.pid, SIGTERM); read_program.read_fd = -1; read_program.pid = -1; return false; } ++rn; rbuf += 3000; if(rbuf > 75000) rbuf = 75000; return true; } YoutubeStaticMediaProxy::YoutubeStaticMediaProxy(ThrottleHandler throttle_handler) : throttle_handler(std::move(throttle_handler)) { } YoutubeStaticMediaProxy::~YoutubeStaticMediaProxy() { stop(); } bool YoutubeStaticMediaProxy::start(const std::string &youtube_media_url, int64_t content_length) { if(socket_fd != -1) return false; socket_fd = socket(AF_INET, SOCK_STREAM, 0); if(socket_fd == -1) { perror("YoutubeStaticMediaProxy::start: socket failed"); return false; } socklen_t response_sock_addr_len; struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); server_addr.sin_port = htons(0); if(bind(socket_fd, (struct sockaddr*)&server_addr, sizeof(server_addr)) == -1) { perror("YoutubeStaticMediaProxy::start: bind failed"); goto err; } if(listen(socket_fd, 2) == -1) { perror("YoutubeStaticMediaProxy::start: listen failed"); goto err; } if(!set_non_blocking(socket_fd)) { perror("YoutubeStaticMediaProxy::start: failed to set socket non blocking mode"); goto err; } struct sockaddr_in response_sock_addr; response_sock_addr_len = sizeof(response_sock_addr); if(getsockname(socket_fd, (struct sockaddr*)&response_sock_addr, &response_sock_addr_len) == -1) { perror("YoutubeStaticMediaProxy::start: getsockname failed"); goto err; } port = ntohs(response_sock_addr.sin_port); this->youtube_media_url = youtube_media_url; this->content_length = content_length; return true; err: if(downloader_read_program.read_fd != -1) close(downloader_read_program.read_fd); if(downloader_read_program.pid != -1) kill(downloader_read_program.pid, SIGTERM); close(socket_fd); socket_fd = -1; return false; } void YoutubeStaticMediaProxy::stop() { if(downloader_read_program.read_fd != -1) { close(downloader_read_program.read_fd); downloader_read_program.read_fd = -1; } if(downloader_read_program.pid != -1) { kill(downloader_read_program.pid, SIGTERM); wait_program(downloader_read_program.pid); downloader_read_program.pid = -1; } if(client_fd != -1) { close(client_fd); client_fd = -1; } if(socket_fd != -1) { close(socket_fd); socket_fd = -1; } clear_download_state(); client_request_buffer.clear(); client_request_finished = false; } YoutubeStaticMediaProxy::Error YoutubeStaticMediaProxy::update() { if(socket_fd == -1) return Error::OK; if(client_fd == -1) { client_fd = accept_client(); if(client_fd == -1) return Error::OK; } else { const int new_client_fd = accept_client(); if(new_client_fd != -1) { on_client_disconnect(); client_fd = new_client_fd; } } Error err = read_client_data(); if(err != Error::OK || !client_request_finished) return err; if(downloader_read_program.pid == -1) return Error::ERROR; return handle_download(); } bool YoutubeStaticMediaProxy::get_address(std::string &address) { if(socket_fd == -1) return false; address = "http://127.0.0.1:" + std::to_string(port); return true; } void YoutubeStaticMediaProxy::on_client_disconnect() { client_request_buffer.clear(); client_request_finished = false; download_started = false; throttle_started = false; if(client_fd != -1) { close(client_fd); client_fd = -1; } if(downloader_read_program.pid != -1) kill(downloader_read_program.pid, SIGTERM); update_download_program_status(true); } // Returns 0 if start range is not found static int64_t header_extract_start_range(const std::string &header) { std::string range = header_extract_value(header, "range"); if(range.empty()) return 0; int64_t start_range = 0; if(sscanf(range.c_str(), " bytes=%lld", &start_range) != 1) return 0; return start_range; } // TODO: What about hls (live streams)? need to test with that. There may not be a need to use YoutubeMediaProxy for that case (in that case document it). YoutubeStaticMediaProxy::Error YoutubeStaticMediaProxy::read_client_data() { if(client_request_finished) return Error::OK; char read_buffer[4096]; const ssize_t num_bytes_read = read(client_fd, read_buffer, sizeof(read_buffer)); if(num_bytes_read == -1) { const int err = errno; if(err == EAGAIN || err == EWOULDBLOCK) { return Error::OK; } else if(err == EPIPE || err == ECONNRESET) { //fprintf(stderr, "YoutubeStaticMediaProxy::read_client_data: client disconnected\n"); on_client_disconnect(); return Error::ERROR; } else { perror("YoutubeStaticMediaProxy::read_client_data: read failed"); return Error::ERROR; } } else if(num_bytes_read == 0) { //fprintf(stderr, "YoutubeStaticMediaProxy::read_client_data: client disconnected\n"); on_client_disconnect(); return Error::ERROR; } client_request_buffer.append(read_buffer, num_bytes_read); const size_t header_end = client_request_buffer.find("\r\n\r\n"); if(header_end != std::string::npos) { client_request_buffer.erase(header_end + 4); client_request_finished = true; download_started = false; throttle_started = false; download_read_buffer_offset = 0; const int64_t new_start_range = header_extract_start_range(client_request_buffer); if(downloader_read_program.pid != -1) { kill(downloader_read_program.pid, SIGTERM); wait_program(downloader_read_program.pid); downloader_read_program.pid = -1; } clear_download_state(); return update_download_program_status(false, new_start_range, true); } else { if(client_request_buffer.size() > MAX_BUFFER_SIZE) { client_request_finished = true; fprintf(stderr, "YoutubeStaticMediaProxy::read_client_data: buffer is full (malicious client?)\n"); return Error::ERROR; } } return Error::OK; } void YoutubeStaticMediaProxy::clear_download_state() { download_header.clear(); download_header_finished = false; download_header_sent = false; download_header_remaining_sent = false; download_header_written_offset = 0; download_read_buffer_offset = 0; } YoutubeStaticMediaProxy::Error YoutubeStaticMediaProxy::update_download_program_status(bool client_disconnected, int64_t new_range_start, bool restart_download) { int program_status = 0; if(downloader_read_program.pid != -1) { if(client_disconnected) { wait_program(downloader_read_program.pid); } else { if(wait_program_non_blocking(downloader_read_program.pid, &program_status) == 0 && program_status == 0) return Error::OK; } downloader_read_program.pid = -1; } if(downloader_read_program.read_fd != -1) { close(downloader_read_program.read_fd); downloader_read_program.read_fd = -1; } // TODO: Why is this not 0 when download finishes? if(program_status != 0) { //fprintf(stderr, "YoutubeStaticMediaProxy::update_download_program_status: download failed, exit status: %d\n", program_status); if(client_fd != -1) { write(client_fd, download_error_response_msg, sizeof(download_error_response_msg) - 1); close(client_fd); client_fd = -1; client_request_buffer.clear(); client_request_finished = false; } return Error::ERROR; } if(client_disconnected) { current_download_range = 0; } else { current_download_range += RANGE + 1; } if(new_range_start != -1) { download_range_start = new_range_start; current_download_range = download_range_start; } if(new_range_start == -1) { download_header_finished = true; download_header_sent = true; download_header_remaining_sent = true; } else { clear_download_state(); } if(client_disconnected) { clear_download_state(); return Error::ERROR; } if(!restart_download) return Error::OK; if(new_range_start == -1) { download_header_finished = true; download_header_sent = true; download_header_remaining_sent = true; } const bool start_download_success = start_download(youtube_media_url, downloader_read_program, current_download_range, new_range_start != -1); if(!start_download_success) { fprintf(stderr, "YoutubeStaticMediaProxy::update_download_program_status: failed to start download\n"); if(client_fd != -1) { write(client_fd, download_error_response_msg, sizeof(download_error_response_msg) - 1); close(client_fd); client_fd = -1; client_request_buffer.clear(); client_request_finished = false; } clear_download_state(); return Error::ERROR; } return Error::OK; } static void header_replace_content_length(std::string &header, size_t header_size, int64_t new_content_length) { if(new_content_length < 0) new_content_length = 0; const char *content_length_p = strcasestr(header.c_str(), "content-length:"); if(!content_length_p) return; const size_t content_length_start = (content_length_p + 15) - header.c_str(); if(content_length_start >= header_size) return; const size_t line_end = header.find("\r\n", content_length_start); header.replace(content_length_start, line_end - content_length_start, std::to_string(new_content_length)); } YoutubeStaticMediaProxy::Error YoutubeStaticMediaProxy::continue_send(const char *buffer_start, size_t total_bytes_to_write, int &buffer_offset) { int num_bytes_to_write = total_bytes_to_write - buffer_offset; //assert(num_bytes_to_write >= 0); if(num_bytes_to_write < 0) num_bytes_to_write = 0; if(num_bytes_to_write == 0) { buffer_offset = 0; return Error::OK; } const ssize_t num_bytes_written = write(client_fd, buffer_start + buffer_offset, num_bytes_to_write); if(num_bytes_written == -1) { const int err = errno; if(err == EAGAIN || err == EWOULDBLOCK) { return Error::OK; } else if(err == EPIPE || err == ECONNRESET) { //fprintf(stderr, "YoutubeStaticMediaProxy::continue_send: client disconnected\n"); on_client_disconnect(); return Error::ERROR; } else { perror("YoutubeStaticMediaProxy::continue_send: write failed"); return Error::ERROR; } } else if(num_bytes_written == 0) { //fprintf(stderr, "YoutubeStaticMediaProxy::continue_send: client disconnected\n"); on_client_disconnect(); return Error::ERROR; } else if(num_bytes_written != num_bytes_to_write) { buffer_offset += num_bytes_written; } else { buffer_offset = 0; } return Error::OK; } static bool http_is_redirect(const char *header, size_t size) { const void *end_of_first_line_p = memmem(header, size, "\r\n", 2); if(!end_of_first_line_p) return false; return memmem(header, (const char*)end_of_first_line_p - header, " 30", 3) != nullptr; } static size_t find_start_of_first_non_redirect_header(const char *headers, size_t size, size_t &header_end) { const char *start = headers; while(size > 0) { const void *end_of_header = memmem(headers, size, "\r\n\r\n", 4); if(!end_of_header) return std::string::npos; const size_t offset_to_end_of_headers = ((const char*)end_of_header + 4) - headers; if(!http_is_redirect(headers, offset_to_end_of_headers)) { header_end = (headers - start) + offset_to_end_of_headers; return headers - start; } headers += offset_to_end_of_headers; size -= offset_to_end_of_headers; } return std::string::npos; } YoutubeStaticMediaProxy::Error YoutubeStaticMediaProxy::handle_download() { // TODO: Maybe read even if write is being slow and failing? if(download_read_buffer_offset == 0) { downloader_num_read_bytes = read(downloader_read_program.read_fd, download_read_buffer, sizeof(download_read_buffer)); if(downloader_num_read_bytes == -1) { const int err = errno; if(err == EAGAIN || err == EWOULDBLOCK) { return Error::OK; } else { perror("YoutubeStaticMediaProxy::handle_download: curl read failed"); return Error::ERROR; } } else if(downloader_num_read_bytes == 0) { Error err = update_download_program_status(false, -1, true); if(err != Error::OK) return err; } else { if(!download_started) { total_downloaded_bytes = 0; download_started = true; throttle_started = false; struct timespec tp; clock_gettime(CLOCK_MONOTONIC, &tp); download_start_time = tp.tv_sec; } total_downloaded_bytes += downloader_num_read_bytes; } } #if 0 if(download_started) { struct timespec tp; clock_gettime(CLOCK_MONOTONIC, &tp); int64_t time_elapsed = tp.tv_sec - download_start_time; int64_t download_speed_kb_sec = 0; if(time_elapsed > 0) download_speed_kb_sec = (total_downloaded_bytes / time_elapsed) / 1024; if(download_speed_kb_sec < THROTTLED_DOWNLOAD_LIMIT_KB) { if(throttle_started) { if(tp.tv_sec - throttle_start_time >= THROTTLED_DURATION_SECS && !throttle_callback_called) { total_downloaded_bytes = 0; download_started = false; throttle_started = false; throttle_callback_called = true; if(throttle_handler) throttle_handler(); } } else { throttle_started = true; throttle_start_time = tp.tv_sec; } } else { throttle_started = false; } } #endif // TODO: Remove this code and instead create the header ourselves and send it to the client. Then pipe the curl output directly to the client input. if(!download_header_finished) { download_header.append(download_read_buffer, downloader_num_read_bytes); size_t header_end = std::string::npos; const size_t offset_to_start_of_header = find_start_of_first_non_redirect_header(download_header.c_str(), download_header.size(), header_end); if(header_end != std::string::npos) { download_header.erase(0, offset_to_start_of_header); header_end -= offset_to_start_of_header; download_header_finished = true; download_header_sent = false; download_header_remaining_sent = false; download_header_written_offset = 0; download_header_offset_to_end_of_header = header_end; download_read_buffer_offset = -1; header_replace_content_length(download_header, header_end, content_length - download_range_start); } else { if(download_header.size() > MAX_BUFFER_SIZE) { fprintf(stderr, "YoutubeStaticMediaProxy::handle_download: buffer is full (malicious server?)\n"); if(downloader_read_program.pid != -1) { kill(downloader_read_program.pid, SIGTERM); wait_program(downloader_read_program.pid); downloader_read_program.pid = -1; } clear_download_state(); update_download_program_status(false, 0, false); return Error::ERROR; } } } if(download_header_finished && !download_header_sent) { Error err = continue_send(download_header.data(), download_header_offset_to_end_of_header, download_header_written_offset); if(err != Error::OK) return err; if(download_header_written_offset == 0) { download_header_sent = true; download_header_written_offset = download_header_offset_to_end_of_header; } } if(download_header_finished && !download_header_remaining_sent) { Error err = continue_send(download_header.data(), download_header.size(), download_header_written_offset); if(err != Error::OK) return err; if(download_header_written_offset == 0) { download_header_remaining_sent = true; download_read_buffer_offset = 0; return Error::OK; } } if(download_header_remaining_sent) return continue_send(download_read_buffer, downloader_num_read_bytes, download_read_buffer_offset); return Error::OK; } int YoutubeStaticMediaProxy::accept_client() { struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int new_client_fd = accept(socket_fd, (struct sockaddr*)&client_addr, &client_addr_len); if(new_client_fd == -1) { const int err = errno; if(err == EAGAIN || err == EWOULDBLOCK) { return -1; } else { perror("YoutubeStaticMediaProxy::accept_client accept failed"); return -1; } } if(!set_non_blocking(new_client_fd)) { perror("YoutubeStaticMediaProxy::accept_client: failed to set client socket non blocking mode"); close(new_client_fd); return -1; } //fprintf(stderr, "YoutubeStaticMediaProxy::accept_client: client connected!\n"); return new_client_fd; } YoutubeLiveStreamMediaProxy::~YoutubeLiveStreamMediaProxy() { stop(); } bool YoutubeLiveStreamMediaProxy::start(const std::string &youtube_media_url, int64_t) { fd[0] = -1; fd[1] = -1; if(pipe(fd) == -1) { perror("YoutubeLiveStreamMediaProxy::start: failed to open pipe"); return false; } //if(socketpair(AF_UNIX, SOCK_STREAM, 0, fd) == -1) { // perror("YoutubeLiveStreamMediaProxy::start: failed to open pipe"); // return false; //} for(int i = 0; i < 2; ++i) { if(!set_non_blocking(fd[i])) { stop(); return false; } } if(!start_download(youtube_media_url, downloader_read_program, 0, true, true)) { stop(); return false; } this->youtube_media_url = youtube_media_url; return true; } void YoutubeLiveStreamMediaProxy::stop() { for(int i = 0; i < 2; ++i) { if(fd[i] != -1) { close(fd[i]); fd[i] = -1; } } if(downloader_read_program.read_fd != -1) { close(downloader_read_program.read_fd); downloader_read_program.read_fd = -1; } if(downloader_read_program.pid != -1) { kill(downloader_read_program.pid, SIGTERM); wait_program(downloader_read_program.pid); downloader_read_program.pid = -1; } } YoutubeMediaProxy::Error YoutubeLiveStreamMediaProxy::update_download_program_status() { int program_status = 0; if(wait_program_non_blocking(downloader_read_program.pid, &program_status) == 0 && program_status == 0) return Error::OK; downloader_read_program.pid = -1; if(downloader_read_program.read_fd != -1) { close(downloader_read_program.read_fd); downloader_read_program.read_fd = -1; } // TODO: Why is this not 0 when download finishes? if(program_status != 0) { //fprintf(stderr, "YoutubeLiveStreamMediaProxy::update_download_program_status: download failed, exit status: %d\n", program_status); stop(); return Error::ERROR; } ++livestream_sequence_num; const bool start_download_success = start_download(youtube_media_url, downloader_read_program, 0, false, true, livestream_sequence_num); if(!start_download_success) { fprintf(stderr, "YoutubeLiveStreamMediaProxy::update_download_program_status: failed to start download\n"); stop(); return Error::ERROR; } return Error::OK; } YoutubeMediaProxy::Error YoutubeLiveStreamMediaProxy::continue_send(const char *buffer_start, size_t total_bytes_to_write, int &buffer_offset) { int num_bytes_to_write = total_bytes_to_write - buffer_offset; //assert(num_bytes_to_write >= 0); if(num_bytes_to_write < 0) num_bytes_to_write = 0; if(num_bytes_to_write == 0) { buffer_offset = 0; return Error::OK; } const ssize_t num_bytes_written = write(fd[1], buffer_start + buffer_offset, num_bytes_to_write); if(num_bytes_written == -1) { const int err = errno; if(err == EAGAIN || err == EWOULDBLOCK) { return Error::OK; } else if(err == EPIPE || err == ECONNRESET) { //fprintf(stderr, "YoutubeLiveStreamMediaProxy::continue_send: client disconnected\n"); stop(); return Error::ERROR; } else { perror("YoutubeLiveStreamMediaProxy::continue_send: write failed"); return Error::ERROR; } } else if(num_bytes_written == 0) { //fprintf(stderr, "YoutubeLiveStreamMediaProxy::continue_send: client disconnected\n"); stop(); return Error::ERROR; } else if(num_bytes_written != num_bytes_to_write) { buffer_offset += num_bytes_written; } else { buffer_offset = 0; } return Error::OK; } YoutubeMediaProxy::Error YoutubeLiveStreamMediaProxy::update() { if(fd[1] == -1 || downloader_read_program.read_fd == -1) return Error::OK; if(download_read_buffer_offset == 0) { downloader_num_read_bytes = read(downloader_read_program.read_fd, download_read_buffer, sizeof(download_read_buffer)); if(downloader_num_read_bytes == -1) { const int err = errno; if(err == EAGAIN || err == EWOULDBLOCK) { return Error::OK; } else { perror("YoutubeLiveStreamMediaProxy::update: curl read failed"); return Error::ERROR; } } else if(downloader_num_read_bytes == 0) { Error err = update_download_program_status(); if(err != Error::OK) return err; } } if(!download_header_finished) { download_header.append(download_read_buffer, downloader_num_read_bytes); size_t header_end = std::string::npos; const size_t offset_to_start_of_header = find_start_of_first_non_redirect_header(download_header.c_str(), download_header.size(), header_end); if(header_end != std::string::npos) { download_header.erase(0, offset_to_start_of_header); header_end -= offset_to_start_of_header; download_header_finished = true; download_header_remaining_sent = false; download_header_written_offset = header_end; download_read_buffer_offset = -1; fprintf(stderr, "header: |%.*s|\n", download_header_written_offset, download_header.c_str()); if(livestream_sequence_num == -1) { // TODO: What about |header_end|? std::string sequence_num = header_extract_value(download_header, "x-sequence-num"); fprintf(stderr, "server sequence num: |%s|\n", sequence_num.c_str()); if(sequence_num.empty()) fprintf(stderr, "YoutubeLiveStreamMediaProxy::update: missing sequence num from server\n"); else livestream_sequence_num = strtoll(sequence_num.c_str(), nullptr, 10); } } else { if(download_header.size() > MAX_BUFFER_SIZE) { fprintf(stderr, "YoutubeLiveStreamMediaProxy::update: buffer is full (malicious server?)\n"); if(downloader_read_program.pid != -1) { kill(downloader_read_program.pid, SIGTERM); wait_program(downloader_read_program.pid); downloader_read_program.pid = -1; } download_header_finished = true; return Error::ERROR; } } } if(download_header_finished && !download_header_remaining_sent) { Error err = continue_send(download_header.data(), download_header.size(), download_header_written_offset); if(err != Error::OK) return err; if(download_header_written_offset == 0) { download_header_remaining_sent = true; download_read_buffer_offset = 0; return Error::OK; } } if(download_header_remaining_sent) return continue_send(download_read_buffer, downloader_num_read_bytes, download_read_buffer_offset); return Error::OK; } bool YoutubeLiveStreamMediaProxy::get_address(std::string &address) { if(fd[0] == -1) return false; address = "fd://" + std::to_string(fd[0]); return true; } }