diff options
Diffstat (limited to 'src/replay_buffer')
-rw-r--r-- | src/replay_buffer/replay_buffer.c | 91 | ||||
-rw-r--r-- | src/replay_buffer/replay_buffer_disk.c | 429 | ||||
-rw-r--r-- | src/replay_buffer/replay_buffer_ram.c | 256 |
3 files changed, 776 insertions, 0 deletions
diff --git a/src/replay_buffer/replay_buffer.c b/src/replay_buffer/replay_buffer.c new file mode 100644 index 0000000..92aa645 --- /dev/null +++ b/src/replay_buffer/replay_buffer.c @@ -0,0 +1,91 @@ +#include "../../include/replay_buffer/replay_buffer.h" +#include "../../include/replay_buffer/replay_buffer_ram.h" +#include "../../include/replay_buffer/replay_buffer_disk.h" + +#include <stdlib.h> +#include <string.h> +#include <assert.h> + +gsr_replay_buffer* gsr_replay_buffer_create(gsr_replay_storage replay_storage, const char *replay_directory, double replay_buffer_time, size_t replay_buffer_num_packets) { + gsr_replay_buffer *replay_buffer = NULL; + switch(replay_storage) { + case GSR_REPLAY_STORAGE_RAM: + replay_buffer = gsr_replay_buffer_ram_create(replay_buffer_num_packets); + break; + case GSR_REPLAY_STORAGE_DISK: + replay_buffer = gsr_replay_buffer_disk_create(replay_directory, replay_buffer_time); + break; + } + + replay_buffer->mutex_initialized = false; + replay_buffer->original_replay_buffer = NULL; + if(pthread_mutex_init(&replay_buffer->mutex, NULL) != 0) { + gsr_replay_buffer_destroy(replay_buffer); + return NULL; + } + + replay_buffer->mutex_initialized = true; + return replay_buffer; +} + +void gsr_replay_buffer_destroy(gsr_replay_buffer *self) { + self->destroy(self); + if(self->mutex_initialized && !self->original_replay_buffer) { + pthread_mutex_destroy(&self->mutex); + self->mutex_initialized = false; + } + self->original_replay_buffer = NULL; + free(self); +} + +void gsr_replay_buffer_lock(gsr_replay_buffer *self) { + if(self->original_replay_buffer) { + gsr_replay_buffer_lock(self->original_replay_buffer); + return; + } + + if(self->mutex_initialized) + pthread_mutex_lock(&self->mutex); +} + +void gsr_replay_buffer_unlock(gsr_replay_buffer *self) { + if(self->original_replay_buffer) { + gsr_replay_buffer_unlock(self->original_replay_buffer); + return; + } + + if(self->mutex_initialized) + pthread_mutex_unlock(&self->mutex); +} + +bool gsr_replay_buffer_append(gsr_replay_buffer *self, const AVPacket *av_packet, double timestamp) { + return self->append(self, av_packet, timestamp); +} + +void gsr_replay_buffer_clear(gsr_replay_buffer *self) { + self->clear(self); +} + +AVPacket* gsr_replay_buffer_iterator_get_packet(gsr_replay_buffer *self, gsr_replay_buffer_iterator iterator) { + return self->iterator_get_packet(self, iterator); +} + +uint8_t* gsr_replay_buffer_iterator_get_packet_data(gsr_replay_buffer *self, gsr_replay_buffer_iterator iterator) { + return self->iterator_get_packet_data(self, iterator); +} + +gsr_replay_buffer* gsr_replay_buffer_clone(gsr_replay_buffer *self) { + return self->clone(self); +} + +gsr_replay_buffer_iterator gsr_replay_buffer_find_packet_index_by_time_passed(gsr_replay_buffer *self, int seconds) { + return self->find_packet_index_by_time_passed(self, seconds); +} + +gsr_replay_buffer_iterator gsr_replay_buffer_find_keyframe(gsr_replay_buffer *self, gsr_replay_buffer_iterator start_iterator, int stream_index, bool invert_stream_index) { + return self->find_keyframe(self, start_iterator, stream_index, invert_stream_index); +} + +bool gsr_replay_buffer_iterator_next(gsr_replay_buffer *self, gsr_replay_buffer_iterator *iterator) { + return self->iterator_next(self, iterator); +} diff --git a/src/replay_buffer/replay_buffer_disk.c b/src/replay_buffer/replay_buffer_disk.c new file mode 100644 index 0000000..0716f3b --- /dev/null +++ b/src/replay_buffer/replay_buffer_disk.c @@ -0,0 +1,429 @@ +#include "../../include/replay_buffer/replay_buffer_disk.h" +#include "../../include/utils.h" + +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <fcntl.h> +#include <unistd.h> +#include <time.h> +#include <errno.h> +#include <assert.h> + +#define REPLAY_BUFFER_FILE_SIZE_BYTES 1024 * 1024 * 256 /* 256MB */ +#define FILE_PREFIX "Replay" + +static void gsr_replay_buffer_disk_set_impl_funcs(gsr_replay_buffer_disk *self); + +static void gsr_av_packet_disk_init(gsr_av_packet_disk *self, const AVPacket *av_packet, size_t data_index, double timestamp) { + self->packet = *av_packet; + self->packet.data = NULL; + self->data_index = data_index; + self->timestamp = timestamp; +} + +static gsr_replay_buffer_file* gsr_replay_buffer_file_create(char *replay_directory, size_t replay_storage_counter, double timestamp, int *replay_storage_fd) { + gsr_replay_buffer_file *self = calloc(1, sizeof(gsr_replay_buffer_file)); + if(!self) { + fprintf(stderr, "gsr error: gsr_av_packet_file_init: failed to create buffer file\n"); + return NULL; + } + + if(create_directory_recursive(replay_directory) != 0) { + fprintf(stderr, "gsr error: gsr_av_packet_file_init: failed to create replay directory: %s\n", replay_directory); + free(self); + return NULL; + } + + char filename[PATH_MAX]; + snprintf(filename, sizeof(filename), "%s/%s_%d.mp4", replay_directory, FILE_PREFIX, (int)replay_storage_counter); + *replay_storage_fd = creat(filename, 0700); + if(*replay_storage_fd <= 0) { + fprintf(stderr, "gsr error: gsr_av_packet_file_init: failed to create replay file: %s\n", filename); + free(self); + return NULL; + } + + self->id = replay_storage_counter; + self->start_timestamp = timestamp; + self->end_timestamp = timestamp; + self->ref_counter = 1; + self->fd = -1; + + self->packets = NULL; + self->capacity_num_packets = 0; + self->num_packets = 0; + return self; +} + +static gsr_replay_buffer_file* gsr_replay_buffer_file_ref(gsr_replay_buffer_file *self) { + if(self->ref_counter >= 1) + ++self->ref_counter; + return self; +} + +static void gsr_replay_buffer_file_free(gsr_replay_buffer_file *self, const char *replay_directory) { + self->ref_counter = 0; + + if(self->fd > 0) { + close(self->fd); + self->fd = -1; + } + + char filename[PATH_MAX]; + snprintf(filename, sizeof(filename), "%s/%s_%d.mp4", replay_directory, FILE_PREFIX, (int)self->id); + remove(filename); + + if(self->packets) { + free(self->packets); + self->packets = NULL; + } + self->num_packets = 0; + self->capacity_num_packets = 0; + + free(self); +} + +static void gsr_replay_buffer_file_unref(gsr_replay_buffer_file *self, const char *replay_directory) { + if(self->ref_counter > 0) + --self->ref_counter; + + if(self->ref_counter <= 0) + gsr_replay_buffer_file_free(self, replay_directory); +} + +static void gsr_replay_buffer_disk_clear(gsr_replay_buffer *replay_buffer) { + gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; + gsr_replay_buffer_lock(&self->replay_buffer); + for(size_t i = 0; i < self->num_files; ++i) { + gsr_replay_buffer_file_unref(self->files[i], self->replay_directory); + } + self->num_files = 0; + gsr_replay_buffer_unlock(&self->replay_buffer); + + if(self->storage_fd > 0) { + close(self->storage_fd); + self->storage_fd = 0; + } + + self->storage_counter = 0; + self->storage_num_bytes_written = 0; +} + +static void gsr_replay_buffer_disk_destroy(gsr_replay_buffer *replay_buffer) { + gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; + gsr_replay_buffer_disk_clear(replay_buffer); + + if(self->owns_directory) { + remove(self->replay_directory); + self->owns_directory = false; + } +} + +static bool file_write_all(int fd, const uint8_t *data, size_t size, size_t *bytes_written_total) { + *bytes_written_total = 0; + while(*bytes_written_total < size) { + const ssize_t bytes_written = write(fd, data + *bytes_written_total, size - *bytes_written_total); + if(bytes_written == -1) { + if(errno == EAGAIN) + continue; + else + return false; + } + *bytes_written_total += bytes_written; + } + return true; +} + +static bool gsr_replay_buffer_disk_create_next_file(gsr_replay_buffer_disk *self, double timestamp) { + if(self->num_files + 1 >= GSR_REPLAY_BUFFER_CAPACITY_NUM_FILES) { + fprintf(stderr, "gsr error: gsr_replay_buffer_disk_create_next_file: too many replay buffer files created! (> %d), either reduce the replay buffer time or report this as a bug\n", (int)GSR_REPLAY_BUFFER_CAPACITY_NUM_FILES); + return false; + } + + gsr_replay_buffer_file *replay_buffer_file = gsr_replay_buffer_file_create(self->replay_directory, self->storage_counter, timestamp, &self->storage_fd); + if(!replay_buffer_file) + return false; + + self->files[self->num_files] = replay_buffer_file; + ++self->num_files; + ++self->storage_counter; + return true; +} + +static bool gsr_replay_buffer_disk_append_to_current_file(gsr_replay_buffer_disk *self, const AVPacket *av_packet, double timestamp) { + gsr_replay_buffer_file *replay_buffer_file = self->files[self->num_files - 1]; + replay_buffer_file->end_timestamp = timestamp; + + if(replay_buffer_file->num_packets + 1 >= replay_buffer_file->capacity_num_packets) { + size_t new_capacity_num_packets = replay_buffer_file->capacity_num_packets * 2; + if(new_capacity_num_packets == 0) + new_capacity_num_packets = 256; + + void *new_packets = realloc(replay_buffer_file->packets, new_capacity_num_packets * sizeof(gsr_av_packet_disk)); + if(!new_packets) { + fprintf(stderr, "gsr error: gsr_replay_buffer_disk_append_to_current_file: failed to reallocate replay buffer file packets\n"); + return false; + } + + replay_buffer_file->capacity_num_packets = new_capacity_num_packets; + replay_buffer_file->packets = new_packets; + } + + gsr_av_packet_disk *packet = &replay_buffer_file->packets[replay_buffer_file->num_packets]; + gsr_av_packet_disk_init(packet, av_packet, self->storage_num_bytes_written, timestamp); + ++replay_buffer_file->num_packets; + + size_t bytes_written = 0; + const bool file_written = file_write_all(self->storage_fd, av_packet->data, av_packet->size, &bytes_written); + self->storage_num_bytes_written += bytes_written; + if(self->storage_num_bytes_written >= REPLAY_BUFFER_FILE_SIZE_BYTES) { + self->storage_num_bytes_written = 0; + close(self->storage_fd); + self->storage_fd = 0; + } + + return file_written; +} + +static void gsr_replay_buffer_disk_remove_first_file(gsr_replay_buffer_disk *self) { + gsr_replay_buffer_file_unref(self->files[0], self->replay_directory); + for(size_t i = 1; i < self->num_files; ++i) { + self->files[i - 1] = self->files[i]; + } + --self->num_files; +} + +static bool gsr_replay_buffer_disk_append(gsr_replay_buffer *replay_buffer, const AVPacket *av_packet, double timestamp) { + gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; + bool success = false; + gsr_replay_buffer_lock(&self->replay_buffer); + + if(self->storage_fd <= 0) { + if(!gsr_replay_buffer_disk_create_next_file(self, timestamp)) + goto done; + } + + const bool data_written = gsr_replay_buffer_disk_append_to_current_file(self, av_packet, timestamp); + + if(self->num_files > 1) { + const double buffer_time_accumulated = timestamp - self->files[1]->start_timestamp; + if(buffer_time_accumulated >= self->replay_buffer_time) + gsr_replay_buffer_disk_remove_first_file(self); + } + + success = data_written; + + done: + gsr_replay_buffer_unlock(&self->replay_buffer); + return success; +} + +static AVPacket* gsr_replay_buffer_disk_iterator_get_packet(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator iterator) { + gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; + assert(iterator.file_index < self->num_files); + assert(iterator.packet_index < self->files[iterator.file_index]->num_packets); + return &self->files[iterator.file_index]->packets[iterator.packet_index].packet; +} + +static uint8_t* gsr_replay_buffer_disk_iterator_get_packet_data(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator iterator) { + gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; + assert(iterator.file_index < self->num_files); + gsr_replay_buffer_file *file = self->files[iterator.file_index]; + assert(iterator.packet_index < file->num_packets); + + if(file->fd <= 0) { + char filename[PATH_MAX]; + snprintf(filename, sizeof(filename), "%s/%s_%d.mp4", self->replay_directory, FILE_PREFIX, (int)file->id); + file->fd = open(filename, O_RDONLY); + if(file->fd <= 0) + return NULL; + } + + const gsr_av_packet_disk *packet = &self->files[iterator.file_index]->packets[iterator.packet_index]; + if(lseek(file->fd, packet->data_index, SEEK_SET) == -1) + return NULL; + + uint8_t *packet_data = malloc(packet->packet.size); + if(read(file->fd, packet_data, packet->packet.size) != packet->packet.size) { + free(packet_data); + return NULL; + } + + return packet_data; +} + +static gsr_replay_buffer* gsr_replay_buffer_disk_clone(gsr_replay_buffer *replay_buffer) { + gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; + gsr_replay_buffer_disk *destination = calloc(1, sizeof(gsr_replay_buffer_disk)); + if(!destination) + return NULL; + + gsr_replay_buffer_disk_set_impl_funcs(destination); + gsr_replay_buffer_lock(&self->replay_buffer); + + destination->replay_buffer.original_replay_buffer = replay_buffer; + destination->replay_buffer.mutex = self->replay_buffer.mutex; + destination->replay_buffer.mutex_initialized = self->replay_buffer.mutex_initialized; + destination->replay_buffer_time = self->replay_buffer_time; + destination->storage_counter = self->storage_counter; + destination->storage_num_bytes_written = self->storage_num_bytes_written; + destination->storage_fd = 0; // We only want to read from the clone. If there is a need to write to it in the future then TODO change this + + for(size_t i = 0; i < self->num_files; ++i) { + destination->files[i] = gsr_replay_buffer_file_ref(self->files[i]); + } + destination->num_files = self->num_files; + + snprintf(destination->replay_directory, sizeof(destination->replay_directory), "%s", self->replay_directory); + destination->owns_directory = false; + + gsr_replay_buffer_unlock(&self->replay_buffer); + return (gsr_replay_buffer*)destination; +} + +/* Binary search */ +static size_t gsr_replay_buffer_file_find_packet_index_by_time_passed(const gsr_replay_buffer_file *self, int seconds) { + const double now = clock_get_monotonic_seconds(); + if(self->num_packets == 0) { + return 0; + } + + size_t lower_bound = 0; + size_t upper_bound = self->num_packets; + size_t index = 0; + + for(;;) { + index = lower_bound + (upper_bound - lower_bound) / 2; + const gsr_av_packet_disk *packet = &self->packets[index]; + const double time_passed_since_packet = now - packet->timestamp; + if(time_passed_since_packet >= seconds) { + if(lower_bound == index) + break; + lower_bound = index; + } else { + if(upper_bound == index) + break; + upper_bound = index; + } + } + + return index; +} + +/* Binary search */ +static gsr_replay_buffer_iterator gsr_replay_buffer_disk_find_file_index_by_time_passed(gsr_replay_buffer *replay_buffer, int seconds) { + gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; + gsr_replay_buffer_lock(&self->replay_buffer); + + const double now = clock_get_monotonic_seconds(); + if(self->num_files == 0) { + gsr_replay_buffer_unlock(&self->replay_buffer); + return (gsr_replay_buffer_iterator){0, 0}; + } + + size_t lower_bound = 0; + size_t upper_bound = self->num_files; + size_t file_index = 0; + + for(;;) { + file_index = lower_bound + (upper_bound - lower_bound) / 2; + const gsr_replay_buffer_file *file = self->files[file_index]; + const double time_passed_since_file_start = now - file->start_timestamp; + const double time_passed_since_file_end = now - file->end_timestamp; + if(time_passed_since_file_start >= seconds && time_passed_since_file_end <= seconds) { + break; + } else if(time_passed_since_file_start >= seconds) { + if(lower_bound == file_index) + break; + lower_bound = file_index; + } else { + if(upper_bound == file_index) + break; + upper_bound = file_index; + } + } + + const gsr_replay_buffer_file *file = self->files[file_index]; + const size_t packet_index = gsr_replay_buffer_file_find_packet_index_by_time_passed(file, seconds); + + gsr_replay_buffer_unlock(&self->replay_buffer); + return (gsr_replay_buffer_iterator){packet_index, file_index}; +} + +static gsr_replay_buffer_iterator gsr_replay_buffer_disk_find_keyframe(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator start_iterator, int stream_index, bool invert_stream_index) { + gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; + gsr_replay_buffer_iterator keyframe_iterator = {(size_t)-1, 0}; + gsr_replay_buffer_lock(&self->replay_buffer); + for(size_t file_index = start_iterator.file_index; file_index < self->num_files; ++file_index) { + const gsr_replay_buffer_file *file = self->files[file_index]; + for(size_t packet_index = start_iterator.packet_index; packet_index < file->num_packets; ++packet_index) { + const gsr_av_packet_disk *packet = &file->packets[packet_index]; + if((packet->packet.flags & AV_PKT_FLAG_KEY) && (invert_stream_index ? packet->packet.stream_index != stream_index : packet->packet.stream_index == stream_index)) { + keyframe_iterator.packet_index = packet_index; + keyframe_iterator.file_index = file_index; + break; + } + } + } + gsr_replay_buffer_unlock(&self->replay_buffer); + return keyframe_iterator; +} + +static bool gsr_replay_buffer_disk_iterator_next(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator *iterator) { + gsr_replay_buffer_disk *self = (gsr_replay_buffer_disk*)replay_buffer; + if(iterator->file_index >= self->num_files) + return false; + + if(iterator->packet_index + 1 >= self->files[iterator->file_index]->num_packets) { + if(iterator->file_index + 1 >= self->num_files) + return false; + + if(self->files[iterator->file_index + 1]->num_packets == 0) + return false; + + ++iterator->file_index; + iterator->packet_index = 0; + return true; + } else { + ++iterator->packet_index; + return true; + } +} + +static void get_current_time(char *time_str, size_t time_str_size) { + time_t now = time(NULL); + struct tm *t = localtime(&now); + strftime(time_str, time_str_size - 1, "%Y-%m-%d_%H-%M-%S", t); +} + +static void gsr_replay_buffer_disk_set_impl_funcs(gsr_replay_buffer_disk *self) { + self->replay_buffer.destroy = gsr_replay_buffer_disk_destroy; + self->replay_buffer.append = gsr_replay_buffer_disk_append; + self->replay_buffer.clear = gsr_replay_buffer_disk_clear; + self->replay_buffer.iterator_get_packet = gsr_replay_buffer_disk_iterator_get_packet; + self->replay_buffer.iterator_get_packet_data = gsr_replay_buffer_disk_iterator_get_packet_data; + self->replay_buffer.clone = gsr_replay_buffer_disk_clone; + self->replay_buffer.find_packet_index_by_time_passed = gsr_replay_buffer_disk_find_file_index_by_time_passed; + self->replay_buffer.find_keyframe = gsr_replay_buffer_disk_find_keyframe; + self->replay_buffer.iterator_next = gsr_replay_buffer_disk_iterator_next; +} + +gsr_replay_buffer* gsr_replay_buffer_disk_create(const char *replay_directory, double replay_buffer_time) { + assert(replay_buffer_time > 0); + gsr_replay_buffer_disk *replay_buffer = calloc(1, sizeof(gsr_replay_buffer_disk)); + if(!replay_buffer) + return NULL; + + char time_str[128]; + get_current_time(time_str, sizeof(time_str)); + + replay_buffer->num_files = 0; + replay_buffer->storage_counter = 0; + replay_buffer->replay_buffer_time = replay_buffer_time; + snprintf(replay_buffer->replay_directory, sizeof(replay_buffer->replay_directory), "%s/gsr-replay-%s.gsr", replay_directory, time_str); + replay_buffer->owns_directory = true; + + gsr_replay_buffer_disk_set_impl_funcs(replay_buffer); + return (gsr_replay_buffer*)replay_buffer; +} diff --git a/src/replay_buffer/replay_buffer_ram.c b/src/replay_buffer/replay_buffer_ram.c new file mode 100644 index 0000000..890588f --- /dev/null +++ b/src/replay_buffer/replay_buffer_ram.c @@ -0,0 +1,256 @@ +#include "../../include/replay_buffer/replay_buffer_ram.h" +#include "../../include/utils.h" + +#include <stdlib.h> +#include <string.h> +#include <assert.h> + +#include <libavutil/mem.h> + +static void gsr_replay_buffer_ram_set_impl_funcs(gsr_replay_buffer_ram *self); + +static gsr_av_packet_ram* gsr_av_packet_ram_create(const AVPacket *av_packet, double timestamp) { + gsr_av_packet_ram *self = malloc(sizeof(gsr_av_packet_ram)); + if(!self) + return NULL; + + self->ref_counter = 1; + self->packet = *av_packet; + self->timestamp = timestamp; + // Why are we doing this you ask? there is a ffmpeg bug that causes cpu usage to increase over time when you have + // packets that are not being free'd until later. So we copy the packet data, free the packet and then reconstruct + // the packet later on when we need it, to keep packets alive only for a short period. + self->packet.data = av_memdup(av_packet->data, av_packet->size); + if(!self->packet.data) { + free(self); + return NULL; + } + + return self; +} + +static gsr_av_packet_ram* gsr_av_packet_ram_ref(gsr_av_packet_ram *self) { + if(self->ref_counter >= 1) + ++self->ref_counter; + return self; +} + +static void gsr_av_packet_ram_free(gsr_av_packet_ram *self) { + self->ref_counter = 0; + if(self->packet.data) { + av_free(self->packet.data); + self->packet.data = NULL; + } + free(self); +} + +static void gsr_av_packet_ram_unref(gsr_av_packet_ram *self) { + if(self->ref_counter >= 1) + --self->ref_counter; + + if(self->ref_counter <= 0) + gsr_av_packet_ram_free(self); +} + +static void gsr_replay_buffer_ram_destroy(gsr_replay_buffer *replay_buffer) { + gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; + gsr_replay_buffer_lock(&self->replay_buffer); + for(size_t i = 0; i < self->num_packets; ++i) { + if(self->packets[i]) { + gsr_av_packet_ram_unref(self->packets[i]); + self->packets[i] = NULL; + } + } + self->num_packets = 0; + gsr_replay_buffer_unlock(&self->replay_buffer); + + if(self->packets) { + free(self->packets); + self->packets = NULL; + } + + self->capacity_num_packets = 0; + self->index = 0; +} + +static bool gsr_replay_buffer_ram_append(gsr_replay_buffer *replay_buffer, const AVPacket *av_packet, double timestamp) { + gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; + gsr_replay_buffer_lock(&self->replay_buffer); + gsr_av_packet_ram *packet = gsr_av_packet_ram_create(av_packet, timestamp); + if(!packet) { + gsr_replay_buffer_unlock(&self->replay_buffer); + return false; + } + + if(self->packets[self->index]) { + gsr_av_packet_ram_unref(self->packets[self->index]); + self->packets[self->index] = NULL; + } + self->packets[self->index] = packet; + + self->index = (self->index + 1) % self->capacity_num_packets; + ++self->num_packets; + if(self->num_packets > self->capacity_num_packets) + self->num_packets = self->capacity_num_packets; + + gsr_replay_buffer_unlock(&self->replay_buffer); + return true; +} + +static void gsr_replay_buffer_ram_clear(gsr_replay_buffer *replay_buffer) { + gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; + gsr_replay_buffer_lock(&self->replay_buffer); + for(size_t i = 0; i < self->num_packets; ++i) { + if(self->packets[i]) { + gsr_av_packet_ram_unref(self->packets[i]); + self->packets[i] = NULL; + } + } + self->num_packets = 0; + self->index = 0; + gsr_replay_buffer_unlock(&self->replay_buffer); +} + +static gsr_av_packet_ram* gsr_replay_buffer_ram_get_packet_at_index(gsr_replay_buffer *replay_buffer, size_t index) { + gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; + assert(index < self->num_packets); + size_t start_index = 0; + if(self->num_packets < self->capacity_num_packets) + start_index = self->num_packets - self->index; + else + start_index = self->index; + + const size_t offset = (start_index + index) % self->capacity_num_packets; + return self->packets[offset]; +} + +static AVPacket* gsr_replay_buffer_ram_iterator_get_packet(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator iterator) { + return &gsr_replay_buffer_ram_get_packet_at_index(replay_buffer, iterator.packet_index)->packet; +} + +static uint8_t* gsr_replay_buffer_ram_iterator_get_packet_data(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator iterator) { + (void)replay_buffer; + (void)iterator; + return NULL; +} + +static gsr_replay_buffer* gsr_replay_buffer_ram_clone(gsr_replay_buffer *replay_buffer) { + gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; + gsr_replay_buffer_ram *destination = calloc(1, sizeof(gsr_replay_buffer_ram)); + if(!destination) + return NULL; + + gsr_replay_buffer_ram_set_impl_funcs(destination); + gsr_replay_buffer_lock(&self->replay_buffer); + + destination->replay_buffer.original_replay_buffer = replay_buffer; + destination->replay_buffer.mutex = self->replay_buffer.mutex; + destination->replay_buffer.mutex_initialized = self->replay_buffer.mutex_initialized; + destination->capacity_num_packets = self->capacity_num_packets; + destination->index = self->index; + destination->packets = calloc(destination->capacity_num_packets, sizeof(gsr_av_packet_ram*)); + if(!destination->packets) { + free(destination); + gsr_replay_buffer_unlock(&self->replay_buffer); + return NULL; + } + + destination->num_packets = self->num_packets; + for(size_t i = 0; i < destination->num_packets; ++i) { + destination->packets[i] = gsr_av_packet_ram_ref(self->packets[i]); + } + + gsr_replay_buffer_unlock(&self->replay_buffer); + return (gsr_replay_buffer*)destination; +} + +/* Binary search */ +static gsr_replay_buffer_iterator gsr_replay_buffer_ram_find_packet_index_by_time_passed(gsr_replay_buffer *replay_buffer, int seconds) { + gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; + gsr_replay_buffer_lock(&self->replay_buffer); + + const double now = clock_get_monotonic_seconds(); + if(self->num_packets == 0) { + gsr_replay_buffer_unlock(&self->replay_buffer); + return (gsr_replay_buffer_iterator){0, 0}; + } + + size_t lower_bound = 0; + size_t upper_bound = self->num_packets; + size_t index = 0; + + for(;;) { + index = lower_bound + (upper_bound - lower_bound) / 2; + const gsr_av_packet_ram *packet = gsr_replay_buffer_ram_get_packet_at_index(replay_buffer, index); + const double time_passed_since_packet = now - packet->timestamp; + if(time_passed_since_packet >= seconds) { + if(lower_bound == index) + break; + lower_bound = index; + } else { + if(upper_bound == index) + break; + upper_bound = index; + } + } + + gsr_replay_buffer_unlock(&self->replay_buffer); + return (gsr_replay_buffer_iterator){index, 0}; +} + +static gsr_replay_buffer_iterator gsr_replay_buffer_ram_find_keyframe(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator start_iterator, int stream_index, bool invert_stream_index) { + gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; + size_t keyframe_index = (size_t)-1; + gsr_replay_buffer_lock(&self->replay_buffer); + for(size_t i = start_iterator.packet_index; i < self->num_packets; ++i) { + const gsr_av_packet_ram *packet = gsr_replay_buffer_ram_get_packet_at_index(replay_buffer, i); + if((packet->packet.flags & AV_PKT_FLAG_KEY) && (invert_stream_index ? packet->packet.stream_index != stream_index : packet->packet.stream_index == stream_index)) { + keyframe_index = i; + break; + } + } + gsr_replay_buffer_unlock(&self->replay_buffer); + return (gsr_replay_buffer_iterator){keyframe_index, 0}; +} + +static bool gsr_replay_buffer_ram_iterator_next(gsr_replay_buffer *replay_buffer, gsr_replay_buffer_iterator *iterator) { + gsr_replay_buffer_ram *self = (gsr_replay_buffer_ram*)replay_buffer; + if(iterator->packet_index + 1 < self->num_packets) { + ++iterator->packet_index; + return true; + } else { + return false; + } +} + +static void gsr_replay_buffer_ram_set_impl_funcs(gsr_replay_buffer_ram *self) { + self->replay_buffer.destroy = gsr_replay_buffer_ram_destroy; + self->replay_buffer.append = gsr_replay_buffer_ram_append; + self->replay_buffer.clear = gsr_replay_buffer_ram_clear; + self->replay_buffer.iterator_get_packet = gsr_replay_buffer_ram_iterator_get_packet; + self->replay_buffer.iterator_get_packet_data = gsr_replay_buffer_ram_iterator_get_packet_data; + self->replay_buffer.clone = gsr_replay_buffer_ram_clone; + self->replay_buffer.find_packet_index_by_time_passed = gsr_replay_buffer_ram_find_packet_index_by_time_passed; + self->replay_buffer.find_keyframe = gsr_replay_buffer_ram_find_keyframe; + self->replay_buffer.iterator_next = gsr_replay_buffer_ram_iterator_next; +} + +gsr_replay_buffer* gsr_replay_buffer_ram_create(size_t replay_buffer_num_packets) { + assert(replay_buffer_num_packets > 0); + gsr_replay_buffer_ram *replay_buffer = calloc(1, sizeof(gsr_replay_buffer_ram)); + if(!replay_buffer) + return NULL; + + replay_buffer->capacity_num_packets = replay_buffer_num_packets; + replay_buffer->num_packets = 0; + replay_buffer->index = 0; + replay_buffer->packets = calloc(replay_buffer->capacity_num_packets, sizeof(gsr_av_packet_ram*)); + if(!replay_buffer->packets) { + gsr_replay_buffer_ram_destroy(&replay_buffer->replay_buffer); + free(replay_buffer); + return NULL; + } + + gsr_replay_buffer_ram_set_impl_funcs(replay_buffer); + return (gsr_replay_buffer*)replay_buffer; +} |