diff options
author | dec05eba <dec05eba@protonmail.com> | 2019-08-22 00:59:49 +0200 |
---|---|---|
committer | dec05eba <dec05eba@protonmail.com> | 2020-07-25 14:36:46 +0200 |
commit | d6f368a3f400fea3e89280262a8147e7ce5d855c (patch) | |
tree | 4eb64eb0d18dad1e40c70a5bff974fe8033fe389 /src | |
parent | df640dc7f55fef962b598562e10d8dd4d60fedc0 (diff) |
Move thread work from compiler/parser to thread_work file, fix use after free bug in multithreaded parser allocator
Diffstat (limited to 'src')
-rw-r--r-- | src/ast.c | 9 | ||||
-rw-r--r-- | src/compiler.c | 390 | ||||
-rw-r--r-- | src/parser.c | 25 | ||||
-rw-r--r-- | src/program.c | 4 | ||||
-rw-r--r-- | src/ssa/ssa.c | 12 | ||||
-rw-r--r-- | src/std/alloc.c | 2 | ||||
-rw-r--r-- | src/std/arena_allocator.c | 6 | ||||
-rw-r--r-- | src/std/log.c | 2 | ||||
-rw-r--r-- | src/std/thread.c | 4 | ||||
-rw-r--r-- | src/std/thread_pool.c | 187 |
10 files changed, 302 insertions, 339 deletions
@@ -421,13 +421,10 @@ int scope_add_child(Scope *self, Ast *child) { } void scope_resolve(Scope *self, AstCompilerContext *context) { - Ast **ast; - Ast **ast_end; - Scope *prev_scope; + Ast **ast = buffer_begin(&self->ast_objects); + Ast **ast_end = buffer_end(&self->ast_objects); + Scope *prev_scope = context->scope; - ast = buffer_begin(&self->ast_objects); - ast_end = buffer_end(&self->ast_objects); - prev_scope = context->scope; context->scope = self; for(; ast != ast_end; ++ast) { ast_resolve(*ast, context); diff --git a/src/compiler.c b/src/compiler.c index 803515b..fd9e4ae 100644 --- a/src/compiler.c +++ b/src/compiler.c @@ -25,37 +25,40 @@ static usize strnlen(const char *str, usize max_length) { } /* TODO: Allow to specify size and members? */ -static CHECK_RESULT int create_default_type(amal_compiler *compiler, const char *name, LhsExpr **lhs_expr) { +static CHECK_RESULT int create_default_type(amal_compiler *compiler, const char *name, amal_default_type **default_type) { StructDecl *struct_decl; Ast *expr; + LhsExpr *lhs_expr; return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(StructDecl), (void**)&struct_decl)); return_if_error(structdecl_init(struct_decl, &compiler->root_scope, &compiler->allocator)); - return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(LhsExpr), (void**)lhs_expr)); - lhsexpr_init(*lhs_expr, DECL_FLAG_EXTERN | DECL_FLAG_PUB | DECL_FLAG_CONST, create_buffer_view(name, strnlen(name, PATH_MAX))); - return_if_error(ast_create(&compiler->allocator, struct_decl, AST_STRUCT_DECL, &(*lhs_expr)->rhs_expr)); - return_if_error(ast_create(&compiler->allocator, *lhs_expr, AST_LHS, &expr)); + return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(amal_default_type), (void**)default_type)); + lhs_expr = &(*default_type)->lhs_expr; + + lhsexpr_init(lhs_expr, DECL_FLAG_EXTERN | DECL_FLAG_PUB | DECL_FLAG_CONST, create_buffer_view(name, strnlen(name, PATH_MAX))); + return_if_error(ast_create(&compiler->allocator, struct_decl, AST_STRUCT_DECL, &lhs_expr->rhs_expr)); + return_if_error(ast_create(&compiler->allocator, lhs_expr, AST_LHS, &expr)); expr->resolve_data.type.type = RESOLVED_TYPE_LHS_EXPR; - expr->resolve_data.type.value.lhs_expr = *lhs_expr; + expr->resolve_data.type.value.lhs_expr = lhs_expr; expr->resolve_data.status = AST_RESOLVED; return scope_add_child(&compiler->root_scope, expr); } static CHECK_RESULT int init_default_types(amal_compiler *compiler) { - return_if_error(create_default_type(compiler, "i8", (LhsExpr**)&compiler->default_types.i8)); - return_if_error(create_default_type(compiler, "i16", (LhsExpr**)&compiler->default_types.i16)); - return_if_error(create_default_type(compiler, "i32", (LhsExpr**)&compiler->default_types.i32)); - return_if_error(create_default_type(compiler, "i64", (LhsExpr**)&compiler->default_types.i64)); - return_if_error(create_default_type(compiler, "u8", (LhsExpr**)&compiler->default_types.u8)); - return_if_error(create_default_type(compiler, "u16", (LhsExpr**)&compiler->default_types.u16)); - return_if_error(create_default_type(compiler, "u32", (LhsExpr**)&compiler->default_types.u32)); - return_if_error(create_default_type(compiler, "u64", (LhsExpr**)&compiler->default_types.u64)); - return_if_error(create_default_type(compiler, "isize", (LhsExpr**)&compiler->default_types.isize)); - return_if_error(create_default_type(compiler, "usize", (LhsExpr**)&compiler->default_types.usize)); - return_if_error(create_default_type(compiler, "f32", (LhsExpr**)&compiler->default_types.f32)); - return_if_error(create_default_type(compiler, "f64", (LhsExpr**)&compiler->default_types.f64)); - return_if_error(create_default_type(compiler, "str", (LhsExpr**)&compiler->default_types.str)); + return_if_error(create_default_type(compiler, "i8", &compiler->default_types.i8)); + return_if_error(create_default_type(compiler, "i16", &compiler->default_types.i16)); + return_if_error(create_default_type(compiler, "i32", &compiler->default_types.i32)); + return_if_error(create_default_type(compiler, "i64", &compiler->default_types.i64)); + return_if_error(create_default_type(compiler, "u8", &compiler->default_types.u8)); + return_if_error(create_default_type(compiler, "u16", &compiler->default_types.u16)); + return_if_error(create_default_type(compiler, "u32", &compiler->default_types.u32)); + return_if_error(create_default_type(compiler, "u64", &compiler->default_types.u64)); + return_if_error(create_default_type(compiler, "isize", &compiler->default_types.isize)); + return_if_error(create_default_type(compiler, "usize", &compiler->default_types.usize)); + return_if_error(create_default_type(compiler, "f32", &compiler->default_types.f32)); + return_if_error(create_default_type(compiler, "f64", &compiler->default_types.f64)); + return_if_error(create_default_type(compiler, "str", &compiler->default_types.str)); compiler->default_types.arithmetic_types[0] = compiler->default_types.i8; compiler->default_types.arithmetic_types[1] = compiler->default_types.u8; @@ -86,11 +89,10 @@ static CHECK_RESULT int init_default_types(amal_compiler *compiler) { bool is_arithmetic_type(LhsExpr *expr, amal_compiler *compiler) { usize i; - const amal_default_types *default_types; - default_types = &compiler->default_types; + const amal_default_types *default_types = &compiler->default_types; for(i = 0; i < NUM_ARITHMETIC_TYPES; ++i) { - if(expr == (LhsExpr*)default_types->arithmetic_types[i]) + if(expr == &default_types->arithmetic_types[i]->lhs_expr) return bool_true; } return bool_false; @@ -103,19 +105,6 @@ void amal_compiler_options_init(amal_compiler_options *self) { } static CHECK_RESULT int amal_compiler_init(amal_compiler *self, const amal_compiler_options *options, amal_program *program) { - int i; - - self->usable_thread_count = options ? options->num_threads : 0; - if(self->usable_thread_count == 0) { - self->usable_thread_count = amal_get_usable_thread_count(); - if(self->usable_thread_count == 0) { - amal_log_warning("Unable to get the number of threads available on the system, using 1 thread."); - amal_log_warning("You can override the number of threads used by setting compiler option for number of thread to use."); - self->usable_thread_count = 1; - } - } - amal_log_info("Using %d threads", self->usable_thread_count); - am_memset(&self->allocator, 0, sizeof(self->allocator)); am_memset(&self->root_scope, 0, sizeof(self->root_scope)); if(options) @@ -124,20 +113,13 @@ static CHECK_RESULT int amal_compiler_init(amal_compiler *self, const amal_compi amal_compiler_options_init(&self->options); self->program = program; self->started = bool_false; - self->work_failed = bool_false; - self->generic_work_object_index = 0; - amal_mutex_init(&self->mutex); + return_if_error(thread_pool_init(&self->stage_task_thread_pool, options ? options->num_threads : 0)); + return_if_error(amal_mutex_init(&self->mutex)); return_if_error(arena_allocator_init(&self->allocator)); cleanup_if_error(scope_init(&self->root_scope, NULL, &self->allocator)); cleanup_if_error(buffer_init(&self->parsers, &self->allocator)); - cleanup_if_error(buffer_init(&self->queued_files, &self->allocator)); cleanup_if_error(hash_map_init(&self->file_scopes, &self->allocator, sizeof(FileScopeReference*), hash_map_compare_string, amal_hash_string)); - cleanup_if_error(arena_allocator_alloc(&self->allocator, - self->usable_thread_count * sizeof(ParserThreadData), - (void**)&self->threads)); - for(i = 0; i < self->usable_thread_count; ++i) - cleanup_if_error(parser_thread_data_init(&self->threads[i])); cleanup_if_error(init_default_types(self)); return AMAL_COMPILER_OK; @@ -147,11 +129,13 @@ static CHECK_RESULT int amal_compiler_init(amal_compiler *self, const amal_compi } void amal_compiler_deinit(amal_compiler *self) { - int i; - for(i = 0; i < self->usable_thread_count; ++i) { - parser_thread_data_deinit(&self->threads[i]); + Parser **parser = buffer_begin(&self->parsers); + Parser **parser_end = buffer_end(&self->parsers); + for(; parser != parser_end; ++parser) { + if((*parser)->allocator) + arena_allocator_deinit((*parser)->allocator); } - + thread_pool_deinit(&self->stage_task_thread_pool); amal_mutex_deinit(&self->mutex); arena_allocator_deinit(&self->allocator); } @@ -165,13 +149,11 @@ typedef enum { typedef struct { amal_compiler *compiler; - ParserThreadData *parser_thread_data; FileScopeReference *file_scope; } CompilerParserThreadUserData; typedef struct { amal_compiler *compiler; - ParserThreadData *parser_thread_data; Parser *parser; ThreadWorkType work_type; } CompilerGenericThreadUserData; @@ -184,19 +166,25 @@ typedef struct { ThreadWorkType type; } ThreadWorkData; -static CHECK_RESULT int amal_compiler_load_in_this_thread(amal_compiler *compiler, FileScopeReference *file_scope, ArenaAllocator *allocator) { +static CHECK_RESULT int amal_compiler_load_in_this_thread(amal_compiler *compiler, FileScopeReference *file_scope) { Parser *parser; int result; BufferView filepath; + ArenaAllocator *parser_allocator; result = AMAL_COMPILER_ERR; + cleanup_if_error(amal_mutex_lock(&compiler->mutex, "amal_compiler_load_in_this_thread, create arena allocator")); + return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(ArenaAllocator), (void**)&parser_allocator)); + amal_mutex_tryunlock(&compiler->mutex); + return_if_error(arena_allocator_init(parser_allocator)); + filepath = create_buffer_view(file_scope->canonical_path.data, file_scope->canonical_path.size); amal_log_info("Started parsing %.*s", filepath.size, filepath.data); - return_if_error(arena_allocator_alloc(allocator, sizeof(Parser), (void**)&parser)); - return_if_error(parser_init(parser, compiler, allocator)); + return_if_error(arena_allocator_alloc(parser_allocator, sizeof(Parser), (void**)&parser)); + return_if_error(parser_init(parser, compiler, parser_allocator)); file_scope->parser = parser; return_if_error(parser_parse_file(parser, filepath)); - cleanup_if_error(amal_mutex_lock(&compiler->mutex, "amal_compiler_load_in_this_thread")); + cleanup_if_error(amal_mutex_lock(&compiler->mutex, "amal_compiler_load_in_this_thread, add parser")); cleanup_if_error(buffer_append(&compiler->parsers, &parser, sizeof(parser))); amal_log_info("Finished parsing %.*s", filepath.size, filepath.data); result = AMAL_COMPILER_OK; @@ -206,51 +194,12 @@ static CHECK_RESULT int amal_compiler_load_in_this_thread(amal_compiler *compile return result; } -static void* thread_callback_parse_file(void *userdata) { - FileScopeReference *file_scope; - CompilerParserThreadUserData compiler_parser_userdata; - void *result; - assert(!amal_thread_is_main()); - - am_memcpy(&compiler_parser_userdata, userdata, sizeof(compiler_parser_userdata)); +static int thread_callback_parse_file(void *userdata) { + int result; + CompilerParserThreadUserData *compiler_parser_userdata = userdata; + result = amal_compiler_load_in_this_thread(compiler_parser_userdata->compiler, + compiler_parser_userdata->file_scope); am_free(userdata); - file_scope = compiler_parser_userdata.file_scope; - result = (void*)AMAL_COMPILER_ERR; - for(;;) { - int has_next; - /* Abort job if another job failed */ - if(compiler_parser_userdata.compiler->work_failed) { - result = (void*)AMAL_COMPILER_WORK_FAIL_ABORT; - goto cleanup; - } - cleanup_if_error(amal_compiler_load_in_this_thread(compiler_parser_userdata.compiler, - file_scope, - &compiler_parser_userdata.parser_thread_data->allocator)); - cleanup_if_error(amal_mutex_lock(&compiler_parser_userdata.compiler->mutex, "thread_callback_parse_file")); - has_next = buffer_pop(&compiler_parser_userdata.compiler->queued_files, &file_scope, sizeof(FileScopeReference*)); - amal_mutex_tryunlock(&compiler_parser_userdata.compiler->mutex); - if(has_next != 0) - break; - } - result = NULL; - - cleanup: - /* - To stop all other parsers from working cleanly, we simply clear the file queue, - and the other threads will stop when they are done with the file they are currently parsing. - */ - if(result != NULL) { - ignore_result_int(amal_mutex_lock(&compiler_parser_userdata.compiler->mutex, "thread_callback_parse_file")); - buffer_clear(&compiler_parser_userdata.compiler->queued_files); - } - /* - There can be a data race between writing to this and when this is read in @amal_compiler_load_file_join_threads. - This is intentional and it's ok, because the join_threads function checks against this status is guaranteed at this point - to not be a certain value that it checks for. Writing to an int is atomic. - TODO: Verify this is ok on all platforms. - */ - compiler_parser_userdata.parser_thread_data->status = PARSER_THREAD_STATUS_IDLE; - amal_mutex_tryunlock(&compiler_parser_userdata.compiler->mutex); return result; } @@ -302,195 +251,72 @@ static CHECK_RESULT int thread_generate_bytecode(Parser *parser) { return result; } -/* TODO: Handle errors (stop work in all other threads and report errors/warnings) */ -static void* thread_callback_generic(void *userdata) { - CompilerGenericThreadUserData compiler_userdata; - Parser *parser; - void *result; - assert(!amal_thread_is_main()); - - am_memcpy(&compiler_userdata, userdata, sizeof(compiler_userdata)); - am_free(userdata); - parser = compiler_userdata.parser; - result = (void*)AMAL_COMPILER_ERR; - for(;;) { - /* TODO: stop work in all other threads on failure */ - switch(compiler_userdata.work_type) { - case THREAD_WORK_PARSE: { - assert(bool_false && "Thread work type can't ge 'parse' for generic work"); - break; - } - case THREAD_WORK_RESOLVE_AST: - cleanup_if_error(thread_resolve_ast(compiler_userdata.compiler, parser)); - break; - case THREAD_WORK_GENERATE_SSA: - cleanup_if_error(thread_generate_ssa(parser)); - break; - case THREAD_WORK_GENERATE_BYTECODE: - cleanup_if_error(thread_generate_bytecode(parser)); - break; - } - - /* Abort job if another job failed */ - if(compiler_userdata.compiler->work_failed) { - result = (void*)AMAL_COMPILER_WORK_FAIL_ABORT; - goto cleanup; +static int thread_callback_generic(void *userdata) { + int result; + CompilerGenericThreadUserData *compiler_userdata = userdata; + switch(compiler_userdata->work_type) { + case THREAD_WORK_PARSE: { + assert(bool_false && "Thread work type can't be 'parse' for generic work"); + break; } - - /* Find next job */ - cleanup_if_error(amal_mutex_lock(&compiler_userdata.compiler->mutex, "thread_callback_generic")); - if(compiler_userdata.compiler->generic_work_object_index + 1 >= (int)buffer_get_size(&compiler_userdata.compiler->parsers, Parser*)) + case THREAD_WORK_RESOLVE_AST: + result = thread_resolve_ast(compiler_userdata->compiler, compiler_userdata->parser); + break; + case THREAD_WORK_GENERATE_SSA: + result = thread_generate_ssa(compiler_userdata->parser); + break; + case THREAD_WORK_GENERATE_BYTECODE: + result = thread_generate_bytecode(compiler_userdata->parser); break; - ++compiler_userdata.compiler->generic_work_object_index; - parser = *(Parser**)buffer_get(&compiler_userdata.compiler->parsers, compiler_userdata.compiler->generic_work_object_index, sizeof(parser)); - amal_mutex_tryunlock(&compiler_userdata.compiler->mutex); - } - result = NULL; - - cleanup: - /* - To stop all other worker threads cleanly, we simply say we are done with all work in the queue, - and the other threads will stop when they are done with the work they are currently working on. - */ - if(result != NULL) { - ignore_result_int(amal_mutex_lock(&compiler_userdata.compiler->mutex, "thread_callback_generic")); - compiler_userdata.compiler->generic_work_object_index = (int)buffer_get_size(&compiler_userdata.compiler->parsers, Parser*); } - compiler_userdata.parser_thread_data->status = PARSER_THREAD_STATUS_IDLE; - amal_mutex_tryunlock(&compiler_userdata.compiler->mutex); + am_free(userdata); return result; } -static CHECK_RESULT int amal_compiler_select_thread_for_work(amal_compiler *self, ThreadWorkData work_data, ParserThreadData **thread_selected) { - int i; - int result; - ParserThreadData *parser_thread_data; - void *thread_user_data; - thread_user_data = NULL; - *thread_selected = NULL; - result = AMAL_COMPILER_OK; - - cleanup_if_error(amal_mutex_lock(&self->mutex, "amal_compiler_select_thread_for_work")); - for(i = 0; i < self->usable_thread_count; ++i) { - parser_thread_data = &self->threads[i]; - if(parser_thread_data->status == PARSER_THREAD_STATUS_RUNNING) - continue; - - switch(work_data.type) { - case THREAD_WORK_PARSE: { - CompilerParserThreadUserData *userdata; - cleanup_if_error(am_malloc(sizeof(CompilerParserThreadUserData), (void**)&userdata)); - thread_user_data = userdata; - userdata->compiler = self; - userdata->parser_thread_data = parser_thread_data; - userdata->file_scope = work_data.value.file_scope; - result = parser_thread_data_start(parser_thread_data, thread_callback_parse_file, userdata); - break; - } - case THREAD_WORK_RESOLVE_AST: - case THREAD_WORK_GENERATE_SSA: - case THREAD_WORK_GENERATE_BYTECODE: { - CompilerGenericThreadUserData *userdata; - cleanup_if_error(am_malloc(sizeof(CompilerGenericThreadUserData), (void**)&userdata)); - thread_user_data = userdata; - userdata->compiler = self; - userdata->parser_thread_data = parser_thread_data; - userdata->parser = work_data.value.parser; - userdata->work_type = work_data.type; - ++self->generic_work_object_index; - result = parser_thread_data_start(parser_thread_data, thread_callback_generic, userdata); - break; - } +static CHECK_RESULT int amal_compiler_add_task(amal_compiler *self, ThreadWorkData work_data) { + void *thread_user_data = NULL; + int result = AMAL_COMPILER_OK; + + switch(work_data.type) { + case THREAD_WORK_PARSE: { + CompilerParserThreadUserData *userdata; + cleanup_if_error(am_malloc(sizeof(CompilerParserThreadUserData), (void**)&userdata)); + thread_user_data = userdata; + userdata->compiler = self; + userdata->file_scope = work_data.value.file_scope; + result = thread_pool_add_task(&self->stage_task_thread_pool, thread_callback_parse_file, userdata); + break; + } + case THREAD_WORK_RESOLVE_AST: + case THREAD_WORK_GENERATE_SSA: + case THREAD_WORK_GENERATE_BYTECODE: { + CompilerGenericThreadUserData *userdata; + cleanup_if_error(am_malloc(sizeof(CompilerGenericThreadUserData), (void**)&userdata)); + thread_user_data = userdata; + userdata->compiler = self; + userdata->parser = work_data.value.parser; + userdata->work_type = work_data.type; + result = thread_pool_add_task(&self->stage_task_thread_pool, thread_callback_generic, userdata); + break; } - *thread_selected = parser_thread_data; - break; } cleanup: if(result != 0) am_free(thread_user_data); - amal_mutex_tryunlock(&self->mutex); - return result; -} - -static CHECK_RESULT bool amal_compiler_check_all_threads_done(amal_compiler *self) { - int i; - bool result; - result = bool_false; - - cleanup_if_error(amal_mutex_lock(&self->mutex, "amal_compiler_check_all_threads_done")); - for(i = 0; i < self->usable_thread_count; ++i) { - ParserThreadData *parser_thread_data; - parser_thread_data = &self->threads[i]; - if(parser_thread_data->status == PARSER_THREAD_STATUS_RUNNING) { - goto cleanup; - } - } - - result = bool_true; - cleanup: - amal_mutex_tryunlock(&self->mutex); - return result; -} - -static CHECK_RESULT int amal_compiler_load_file_join_threads(amal_compiler *self) { - int i; - int result; - void *thread_return_data; - ParserThreadData *parser_thread_data; - - assert(amal_thread_is_main()); - thread_return_data = NULL; - for(;;) { - bool done; - /* - Joining running threads. After checking one running thread another one might start up, - so this is mostly to wait for threads to finish and to sleep without doing work. - The check after that (amal_compiler_all_threads_done) check that all threads are done correctly - */ - for(i = 0; i < self->usable_thread_count; ++i) { - result = amal_mutex_lock(&self->mutex, "amal_compiler_load_file_join_threads, waiting for workers"); - parser_thread_data = &self->threads[i]; - amal_mutex_tryunlock(&self->mutex); - if(result != 0) - goto cleanup; - /* TODO: What to do if join fails? */ - ignore_result_int(parser_thread_data_join(parser_thread_data, &thread_return_data)); - if(thread_return_data != NULL) { - amal_log_error("Failed, waiting for jobs to finish"); - self->work_failed = bool_true; - } - } - - done = amal_compiler_check_all_threads_done(self); - if(done) - break; - } - - result = AMAL_COMPILER_OK; - cleanup: - if(self->work_failed) - result = AMAL_COMPILER_ERR; return result; } static CHECK_RESULT int amal_compiler_dispatch_generic(amal_compiler *self, ThreadWorkType work_type) { - Parser **parser; - Parser **parser_end; - parser = buffer_begin(&self->parsers); - parser_end = buffer_end(&self->parsers); - self->generic_work_object_index = 0; + Parser **parser = buffer_begin(&self->parsers); + Parser **parser_end = buffer_end(&self->parsers); for(; parser != parser_end; ++parser) { - ParserThreadData *thread_selected; ThreadWorkData thread_work_data; thread_work_data.type = work_type; thread_work_data.value.parser = *parser; - return_if_error(amal_compiler_select_thread_for_work(self, thread_work_data, &thread_selected)); - /* After all threads have been used, they will handle using the remaining parsers or stop if there is an error */ - if(!thread_selected) - break; + return_if_error(amal_compiler_add_task(self, thread_work_data)); } - return amal_compiler_load_file_join_threads(self); + return thread_pool_join_all_tasks(&self->stage_task_thread_pool) ? 0 : -1; } static CHECK_RESULT int amal_compiler_generate_program(amal_compiler *self) { @@ -498,10 +324,8 @@ static CHECK_RESULT int amal_compiler_generate_program(amal_compiler *self) { TODO: Copying the bytecode to the program can be done using multiple threads. Use self->threads for that. */ - Parser **parser; - Parser **parser_end; - parser = buffer_begin(&self->parsers); - parser_end = buffer_end(&self->parsers); + Parser **parser = buffer_begin(&self->parsers); + Parser **parser_end = buffer_end(&self->parsers); for(; parser != parser_end; ++parser) { return_if_error(amal_program_append_bytecode(self->program, &(*parser)->bytecode)); } @@ -580,15 +404,10 @@ static CHECK_RESULT int validate_main_func(FileScopeReference *main_file_scope, } int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, FileScopeReference **file_scope) { - int result; - ParserThreadData *parser_thread_data; ThreadWorkData thread_work_data; bool main_job; bool new_entry; - if(self->work_failed) - return AMAL_COMPILER_WORK_FAIL_ABORT; - return_if_error(try_create_file_scope(self, filepath, file_scope, &new_entry)); assert(file_scope && *file_scope && (*file_scope)->canonical_path.data); if(!new_entry) { @@ -596,7 +415,6 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, return 0; } - result = AMAL_COMPILER_ERR; thread_work_data.type = THREAD_WORK_PARSE; thread_work_data.value.file_scope = *file_scope; main_job = bool_false; @@ -607,7 +425,7 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, main_job = bool_true; } - return_if_error(amal_compiler_select_thread_for_work(self, thread_work_data, &parser_thread_data)); + return_if_error(amal_compiler_add_task(self, thread_work_data)); if(main_job) { /*doc(Compiler flow) @@ -619,8 +437,8 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, */ LhsExpr *main_func; - return_if_error(amal_compiler_load_file_join_threads(self)); - assert(amal_compiler_check_all_threads_done(self)); + if(!thread_pool_join_all_tasks(&self->stage_task_thread_pool)) + return -1; amal_log_info("Finished parsing all files, resolving AST"); return_if_error(validate_main_func(*file_scope, &main_func)); @@ -631,15 +449,12 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, main_func->decl_flags |= DECL_FLAG_EXPORT; return_if_error(amal_compiler_dispatch_generic(self, THREAD_WORK_RESOLVE_AST)); - assert(amal_compiler_check_all_threads_done(self)); amal_log_info("Finished resolving AST, generating SSA"); return_if_error(amal_compiler_dispatch_generic(self, THREAD_WORK_GENERATE_SSA)); - assert(amal_compiler_check_all_threads_done(self)); amal_log_info("Finished generating SSA"); return_if_error(amal_compiler_dispatch_generic(self, THREAD_WORK_GENERATE_BYTECODE)); - assert(amal_compiler_check_all_threads_done(self)); amal_log_info("Finished generating bytecode"); return_if_error(amal_compiler_generate_program(self)); @@ -648,14 +463,5 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, return AMAL_COMPILER_OK; } - if(parser_thread_data) - return AMAL_COMPILER_OK; - - cleanup_if_error(amal_mutex_lock(&self->mutex, "amal_compiler_load_file")); - cleanup_if_error(buffer_append(&self->queued_files, file_scope, sizeof(FileScopeReference*))); - result = AMAL_COMPILER_OK; - - cleanup: - amal_mutex_tryunlock(&self->mutex); - return result; + return 0; } diff --git a/src/parser.c b/src/parser.c index 03b835e..85165d7 100644 --- a/src/parser.c +++ b/src/parser.c @@ -30,31 +30,6 @@ static void parser_parse_var_type(Parser *self, VariableType *result); static void parser_parse_var_type_def(Parser *self, VariableType *result); static void parser_queue_file(Parser *self, BufferView path, FileScopeReference **file_scope); -int parser_thread_data_init(ParserThreadData *self) { - am_memset(&self->allocator, 0, sizeof(self->allocator)); - am_memset(&self->thread, 0, sizeof(self->thread)); - self->status = PARSER_THREAD_STATUS_NEW; - return arena_allocator_init(&self->allocator); -} - -void parser_thread_data_deinit(ParserThreadData *self) { - ignore_result_int(amal_thread_deinit(&self->thread)); - arena_allocator_deinit(&self->allocator); -} - -int parser_thread_data_start(ParserThreadData *self, AmalThreadCallbackFunc callback_func, void *userdata) { - return_if_error(amal_thread_deinit(&self->thread)); - return_if_error(amal_thread_create(&self->thread, AMAL_THREAD_JOINABLE, "Parser", callback_func, userdata)); - self->status = PARSER_THREAD_STATUS_RUNNING; - return 0; -} - -int parser_thread_data_join(ParserThreadData *self, void **result) { - if(self->status == PARSER_THREAD_STATUS_NEW) - return 0; - return amal_thread_join(&self->thread, result); -} - int parser_init(Parser *self, amal_compiler *compiler, ArenaAllocator *allocator) { self->allocator = allocator; self->compiler = compiler; diff --git a/src/program.c b/src/program.c index 17aee03..082f9fd 100644 --- a/src/program.c +++ b/src/program.c @@ -216,6 +216,8 @@ static CHECK_RESULT int amal_program_read_strings(amal_program *self) { if(bytes_left_to_read(self) < strings_size) return AMAL_PROGRAM_INVALID_STRINGS_SIZE; + am_free(self->string_indices); + self->string_indices = NULL; if(am_malloc(sizeof(u32) * self->num_strings, (void**)&self->string_indices) != 0) return AMAL_PROGRAM_ALLOC_FAILURE; string_index_ptr = self->string_indices; @@ -266,6 +268,8 @@ static CHECK_RESULT int amal_program_read_external_functions(amal_program *self) if(bytes_left_to_read(self) < extern_funcs_size) return AMAL_PROGRAM_INVALID_EXTERNAL_FUNCTIONS_SIZE; + am_free(self->extern_func_indices); + self->extern_func_indices = NULL; if(am_malloc(sizeof(u32) * self->num_extern_functions, (void**)&self->extern_func_indices) != 0) return AMAL_PROGRAM_ALLOC_FAILURE; extern_func_index_ptr = self->extern_func_indices; diff --git a/src/ssa/ssa.c b/src/ssa/ssa.c index 39b5a80..500555a 100644 --- a/src/ssa/ssa.c +++ b/src/ssa/ssa.c @@ -551,10 +551,8 @@ static CHECK_RESULT SsaRegister funccall_generate_ssa(FunctionCall *self, AstRes LhsExpr *func_lhs_expr; { - Ast **arg; - Ast **arg_end; - arg = buffer_begin(&self->args); - arg_end = buffer_end(&self->args); + Ast **arg = buffer_begin(&self->args); + Ast **arg_end = buffer_end(&self->args); for(; arg != arg_end; ++arg) { SsaRegister arg_reg; arg_reg = ast_generate_ssa(*arg, context); @@ -781,10 +779,8 @@ CHECK_RESULT SsaRegister scope_named_object_generate_ssa(ScopeNamedObject *self, } void scope_generate_ssa(Scope *self, SsaCompilerContext *context) { - Ast **ast; - Ast **ast_end; - ast = buffer_begin(&self->ast_objects); - ast_end = buffer_end(&self->ast_objects); + Ast **ast = buffer_begin(&self->ast_objects); + Ast **ast_end = buffer_end(&self->ast_objects); for(; ast != ast_end; ++ast) { ignore_result_int(ast_generate_ssa(*ast, context)); } diff --git a/src/std/alloc.c b/src/std/alloc.c index d07e94f..44a5105 100644 --- a/src/std/alloc.c +++ b/src/std/alloc.c @@ -15,7 +15,7 @@ int am_malloc(usize size, void **mem) { int am_realloc(void *mem, usize new_size, void **new_mem) { void *new_allocated_data = realloc(mem, new_size); - if(!new_allocated_data) { + if(!new_allocated_data && new_size != 0) { amal_log_error("am_malloc: failed to reallocate memory to size %lu", new_size); return ALLOC_FAIL; } diff --git a/src/std/arena_allocator.c b/src/std/arena_allocator.c index 4934925..11fb40d 100644 --- a/src/std/arena_allocator.c +++ b/src/std/arena_allocator.c @@ -32,10 +32,8 @@ int arena_allocator_init(ArenaAllocator *self) { } static void arena_allocator_deinit_buffers(ArenaAllocator *self) { - void **mem; - void **mems_end; - mem = buffer_begin(&self->mems); - mems_end = buffer_end(&self->mems); + void **mem = buffer_begin(&self->mems); + void **mems_end = buffer_end(&self->mems); while(mem != mems_end) { am_free(*mem); ++mem; diff --git a/src/std/log.c b/src/std/log.c index 88ff0cf..59e0319 100644 --- a/src/std/log.c +++ b/src/std/log.c @@ -12,7 +12,7 @@ static bool mutex_initialized = bool_false; /* Safe to call multiple times */ static void mutex_init() { if(!mutex_initialized) { - amal_mutex_init(&mutex); + ignore_result_int(amal_mutex_init(&mutex)); mutex_initialized = bool_true; } } diff --git a/src/std/thread.c b/src/std/thread.c index fb0e9ab..87362d2 100644 --- a/src/std/thread.c +++ b/src/std/thread.c @@ -107,13 +107,13 @@ int amal_thread_join(amal_thread *self, void **result) { return 0; } -void amal_mutex_init(amal_mutex *self) { - pthread_mutex_init(&self->mutex, NULL); +int amal_mutex_init(amal_mutex *self) { #ifdef AMAL_MUTEX_DEBUG self->lock_identifier = NULL; #endif self->locked = bool_false; self->owner_thread = 0; + return pthread_mutex_init(&self->mutex, NULL); } void amal_mutex_deinit(amal_mutex *self) { diff --git a/src/std/thread_pool.c b/src/std/thread_pool.c new file mode 100644 index 0000000..8f6e180 --- /dev/null +++ b/src/std/thread_pool.c @@ -0,0 +1,187 @@ +#include "../../include/std/thread_pool.h" +#include "../../include/std/alloc.h" +#include "../../include/std/mem.h" +#include "../../include/std/log.h" + +/* Sets @result to NULL if there are no available tasks */ +static CHECK_RESULT int thread_pool_take_task(amal_thread_pool *self, amal_thread_pool_task **result) { + *result = NULL; + cleanup_if_error(amal_mutex_lock(&self->task_select_mutex, "thread_pool_take_task")); + if(self->num_finished_queued_tasks < (int)buffer_get_size(&self->queued_tasks, amal_thread_pool_task) && !self->dead) + *result = buffer_get(&self->queued_tasks, self->num_finished_queued_tasks, sizeof(amal_thread_pool_task)); + cleanup: + amal_mutex_tryunlock(&self->task_select_mutex); + return 0; +} + +static void* thread_pool_thread_callback(void *userdata) { + amal_thread_pool_callback_data *thread_pool_data = userdata; + if(thread_pool_data->thread_pool->dead) + goto cleanup; + + if(thread_pool_data->callback(thread_pool_data->userdata) != 0) { + thread_pool_mark_dead(thread_pool_data->thread_pool); + goto cleanup; + } + + for(;;) { + amal_thread_pool_task *new_task; + cleanup_if_error(thread_pool_take_task(thread_pool_data->thread_pool, &new_task)); + if(!new_task) + break; + + if(new_task->callback(new_task->userdata) != 0) { + thread_pool_mark_dead(thread_pool_data->thread_pool); + goto cleanup; + } + } + + cleanup: + thread_pool_data->thread_pool_thread->status = THREAD_POOL_THREAD_STATUS_IDLE; + am_free(thread_pool_data); + return NULL; +} + +static void thread_pool_thread_init(amal_thread_pool_thread *self) { + am_memset(&self->thread, 0, sizeof(self->thread)); + self->status = THREAD_POOL_THREAD_STATUS_NEW; +} + +static void thread_pool_thread_deinit(amal_thread_pool_thread *self) { + ignore_result_int(amal_thread_deinit(&self->thread)); +} + +static CHECK_RESULT int thread_pool_thread_start(amal_thread_pool_thread *self, amal_thread_pool *thread_pool, amal_thread_job_callback callback, void *userdata) { + amal_thread_pool_callback_data *callback_data; + return_if_error(am_malloc(sizeof(amal_thread_pool_callback_data), (void**)&callback_data)); + callback_data->thread_pool = thread_pool; + callback_data->thread_pool_thread = self; + callback_data->callback = callback; + callback_data->userdata = userdata; + + return_if_error(amal_thread_deinit(&self->thread)); + return_if_error(amal_thread_create(&self->thread, AMAL_THREAD_JOINABLE, "thread_pool_thread_start", thread_pool_thread_callback, callback_data)); + self->status = THREAD_POOL_THREAD_STATUS_RUNNING; + return 0; +} + +static CHECK_RESULT int thread_pool_thread_join(amal_thread_pool_thread *self) { + if(self->status == THREAD_POOL_THREAD_STATUS_NEW) + return 0; + return amal_thread_join(&self->thread, NULL); +} + +int thread_pool_init(amal_thread_pool *self, int num_threads) { + int i; + self->num_threads = num_threads != 0 ? num_threads : amal_get_usable_thread_count(); + if(self->num_threads == 0) { + amal_log_warning("Unable to get the number of threads available on the system, using 1 thread."); + self->num_threads = 1; + } + + self->dead = bool_false; + self->num_finished_queued_tasks = 0; + self->threads = NULL; + + ignore_result_int(buffer_init(&self->queued_tasks, NULL)); + return_if_error(amal_mutex_init(&self->task_select_mutex)); + cleanup_if_error(am_malloc(self->num_threads * sizeof(amal_thread_pool_thread), (void**)&self->threads)); + for(i = 0; i < self->num_threads; ++i) + thread_pool_thread_init(&self->threads[i]); + return 0; + + cleanup: + am_free(self->threads); + self->num_threads = 0; + return -1; +} + +void thread_pool_deinit(amal_thread_pool *self) { + if(self->threads) { + int i; + for(i = 0; i < self->num_threads; ++i) + thread_pool_thread_deinit(&self->threads[i]); + am_free(self->threads); + } + amal_mutex_deinit(&self->task_select_mutex); + buffer_deinit(&self->queued_tasks); +} + +int thread_pool_add_task(amal_thread_pool *self, amal_thread_job_callback callback, void *userdata) { + int i; + bool found_available_thread = bool_false; + int result = -1; + + if(self->dead) + return result; + + cleanup_if_error(amal_mutex_lock(&self->task_select_mutex, "thread_pool_add_task")); + for(i = 0; i < self->num_threads; ++i) { + amal_thread_pool_thread *thread = &self->threads[i]; + if(thread->status != THREAD_POOL_THREAD_STATUS_RUNNING) { + cleanup_if_error(thread_pool_thread_start(thread, self, callback, userdata)); + found_available_thread = bool_true; + break; + } + } + + if(!found_available_thread) { + amal_thread_pool_task task; + task.callback = callback; + task.userdata = userdata; + cleanup_if_error(buffer_append(&self->queued_tasks, &task, sizeof(task))); + } + + result = 0; + cleanup: + amal_mutex_tryunlock(&self->task_select_mutex); + return result; +} + +bool thread_pool_join_all_tasks(amal_thread_pool *self) { + bool died; + for(;;) { + /* + Joining running threads. After checking one running thread another one might start up, + so this is mostly to wait for threads to finish and to sleep without doing work. + The check after that (thread_pool_check_threads_finished) check that all threads have finished correctly + */ + int i; + bool finished = bool_true; + for(i = 0; i < self->num_threads; ++i) { + amal_thread_pool_thread_status thread_status; + if(amal_mutex_lock(&self->task_select_mutex, "thread_pool_join_all_tasks") != 0) + thread_pool_mark_dead(self); + thread_status = self->threads[i].status; + amal_mutex_tryunlock(&self->task_select_mutex); + /* TODO: What to do if join fails? */ + switch(thread_status) { + case THREAD_POOL_THREAD_STATUS_NEW: + break; + case THREAD_POOL_THREAD_STATUS_RUNNING: + finished = bool_false; + /* fallthrough */ + case THREAD_POOL_THREAD_STATUS_IDLE: + ignore_result_int(thread_pool_thread_join(&self->threads[i])); + break; + } + } + + if(finished) + break; + } + + died = self->dead; + self->dead = bool_false; + buffer_clear(&self->queued_tasks); + self->num_finished_queued_tasks = 0; + return !died; +} + +void thread_pool_mark_dead(amal_thread_pool *self) { + self->dead = bool_true; +} + +BufferView thread_pool_get_threads(amal_thread_pool *self) { + return create_buffer_view((const char*)self->threads, self->num_threads * sizeof(amal_thread_pool_thread)); +} |