From d6f368a3f400fea3e89280262a8147e7ce5d855c Mon Sep 17 00:00:00 2001 From: dec05eba Date: Thu, 22 Aug 2019 00:59:49 +0200 Subject: Move thread work from compiler/parser to thread_work file, fix use after free bug in multithreaded parser allocator --- README.md | 12 +- executor/x86_64/asm.c | 2 +- include/compiler.h | 9 +- include/defs.h | 1 - include/parser.h | 17 -- include/std/buffer.h | 2 +- include/std/thread.h | 2 +- include/std/thread_pool.h | 71 +++++++++ src/ast.c | 9 +- src/compiler.c | 390 ++++++++++++---------------------------------- src/parser.c | 25 --- src/program.c | 4 + src/ssa/ssa.c | 12 +- src/std/alloc.c | 2 +- src/std/arena_allocator.c | 6 +- src/std/log.c | 2 +- src/std/thread.c | 4 +- src/std/thread_pool.c | 187 ++++++++++++++++++++++ tests/bytecode.amal | 2 + 19 files changed, 389 insertions(+), 370 deletions(-) create mode 100644 include/std/thread_pool.h create mode 100644 src/std/thread_pool.c diff --git a/README.md b/README.md index 4a1f0b5..6dc4086 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,14 @@ Verify all code execution paths in a function return a value, if the function ex Show compile error if the result of a function call is ignored.\ Show compile error if function result type and assigned to variable have different types.\ Show compile error if variables are assigned to but not used.\ -Push arguments in reverse order (right-to-left, cdecl) (in program.c, since on windows we will need to support stdcall which is left-to-right). -## Urgent -Simplify src/compiler.c, it's pretty complex with the thread work done right now. The thread work should be put in a -thread dispatch file that only handles thread job dispatching. +Push arguments in reverse order (right-to-left, cdecl) (in program.c, since on windows we will need to support stdcall which is left-to-right).\ +After tokenizing files, unload file data to disk. Otherwise large programs (as large as chromium) will need gigabytes of ram to compile. # Documents Documents are located under doc. The file doc/Documentation.md is generated from source files by running doc/doc_extract.py but there is no need to run this script unless you are modifying documentation in the source. +# Development +Define `AMAL_MUTEX_DEBUG` in src/thread.c to enable mutex debugging.\ +Set `PEDANTIC` environment variable to 1 before compiling to enable pedantic mode.\ +Set `SANITIZE_ADDRESS` environment variable to 1 before compiling to run compile with asan.\ +Set `SANITIZE_THREAD` environment variable to 1 before compiling to run compile with tsan.\ +Set `SCAN_BUILD` environment variable to 1 before compiling to run scan-build on the source files. \ No newline at end of file diff --git a/executor/x86_64/asm.c b/executor/x86_64/asm.c index 606e539..c741f15 100644 --- a/executor/x86_64/asm.c +++ b/executor/x86_64/asm.c @@ -315,7 +315,7 @@ static void asm_rm(Asm *self, AsmPtr *mem, Reg64 reg) { } *self->code_it++ = (reg << 3) | rm_byte; - /* RSP requires SIB byte when displacement is not 0 */ + /* RSP requires SIB byte */ if(mem->base == RSP) *self->code_it++ = 0x24; } diff --git a/include/compiler.h b/include/compiler.h index 08b74b8..ac50d28 100644 --- a/include/compiler.h +++ b/include/compiler.h @@ -5,7 +5,7 @@ #include "std/buffer.h" #include "std/buffer_view.h" #include "std/arena_allocator.h" -#include "std/thread.h" +#include "std/thread_pool.h" #include "compiler_options.h" #include "ast.h" #include "program.h" @@ -49,20 +49,17 @@ struct amal_compiler { ArenaAllocator allocator; Scope root_scope; Buffer/**/ parsers; - Buffer/**/ queued_files; HashMapType(BufferView, FileScopeReference*) file_scopes; - ParserThreadData *threads; - int usable_thread_count; bool started; - bool work_failed; amal_mutex mutex; - int generic_work_object_index; + amal_thread_pool stage_task_thread_pool; }; void amal_compiler_options_init(amal_compiler_options *self); /* If @options is NULL, then default values are used. + @options are copied. You should run @amal_program_deinit even @amal_compiler_load_file fails, to cleanup memory. This function creates a copy of @filepath so it doesn't have to survive longer than this function call. The file has to be in utf-8 format and it can optionally have utf-8 BOM. diff --git a/include/defs.h b/include/defs.h index 8cd9d39..c8db820 100644 --- a/include/defs.h +++ b/include/defs.h @@ -1,7 +1,6 @@ #ifndef AMALGAM_DEFS_H #define AMALGAM_DEFS_H -typedef struct ParserThreadData ParserThreadData; typedef struct amal_compiler amal_compiler; typedef struct Parser Parser; typedef struct Scope Scope; diff --git a/include/parser.h b/include/parser.h index 4427cc8..81895e1 100644 --- a/include/parser.h +++ b/include/parser.h @@ -16,18 +16,6 @@ #define PARSER_ERR -1 #define PARSER_UNEXPECTED_TOKEN -2 -typedef enum { - PARSER_THREAD_STATUS_NEW, - PARSER_THREAD_STATUS_IDLE, - PARSER_THREAD_STATUS_RUNNING -} ParserThreadStatus; - -struct ParserThreadData { - amal_thread thread; - ParserThreadStatus status; - ArenaAllocator allocator; -}; - typedef enum { ERROR_CONTEXT_NONE, ERROR_CONTEXT_RHS_STANDALONE, @@ -51,11 +39,6 @@ struct Parser { Bytecode bytecode; }; -CHECK_RESULT int parser_thread_data_init(ParserThreadData *self); -void parser_thread_data_deinit(ParserThreadData *self); -CHECK_RESULT int parser_thread_data_start(ParserThreadData *self, AmalThreadCallbackFunc callback_func, void *userdata); -CHECK_RESULT int parser_thread_data_join(ParserThreadData *self, void **result); - CHECK_RESULT int parser_init(Parser *self, amal_compiler *compiler, ArenaAllocator *allocator); /* diff --git a/include/std/buffer.h b/include/std/buffer.h index ac722b1..d194881 100644 --- a/include/std/buffer.h +++ b/include/std/buffer.h @@ -30,7 +30,7 @@ CHECK_RESULT int buffer_append_empty(Buffer *self, usize size); CHECK_RESULT int buffer_expand(Buffer *self, usize size); void* buffer_get(Buffer *self, usize index, usize type_size); CHECK_RESULT int buffer_pop(Buffer *self, void *data, usize size); -/* Set buffer size to 0, doesn't reallocate */ +/* Set buffer size to 0, doesn't change the capacity */ void buffer_clear(Buffer *self); void* buffer_begin(Buffer *self); void* buffer_end(Buffer *self); diff --git a/include/std/thread.h b/include/std/thread.h index 356ebf0..2765204 100644 --- a/include/std/thread.h +++ b/include/std/thread.h @@ -47,7 +47,7 @@ bool amal_thread_is_main(); /* Returns 0 if the number of usable threads is unknown */ int amal_get_usable_thread_count(); -void amal_mutex_init(amal_mutex *self); +CHECK_RESULT int amal_mutex_init(amal_mutex *self); void amal_mutex_deinit(amal_mutex *self); CHECK_RESULT int amal_mutex_lock(amal_mutex *self, const char *lock_identifier); /* Safe to call unlock when another thread owns the lock or if the lock is not locked */ diff --git a/include/std/thread_pool.h b/include/std/thread_pool.h new file mode 100644 index 0000000..f8acf83 --- /dev/null +++ b/include/std/thread_pool.h @@ -0,0 +1,71 @@ +#ifndef AMAL_THREAD_POOL_H +#define AMAL_THREAD_POOL_H + +#include "thread.h" +#include "buffer.h" +#include "buffer_view.h" + +/* + Return 0 if there is no error, otherwise return a non-0 value. + If the result is not 0, then all additional tasks will be stopped. +*/ +typedef int(*amal_thread_job_callback)(void *userdata); + +typedef enum { + THREAD_POOL_THREAD_STATUS_NEW, + THREAD_POOL_THREAD_STATUS_IDLE, + THREAD_POOL_THREAD_STATUS_RUNNING +} amal_thread_pool_thread_status; + +typedef struct { + amal_thread thread; + amal_thread_pool_thread_status status; +} amal_thread_pool_thread; + +typedef struct { + amal_thread_job_callback callback; + void *userdata; +} amal_thread_pool_task; + +typedef struct { + int num_threads; + amal_thread_pool_thread *threads; /* Size of @threads is @num_threads */ + amal_mutex task_select_mutex; + Buffer queued_tasks; + bool dead; + int num_finished_queued_tasks; +} amal_thread_pool; + +typedef struct { + amal_thread_pool *thread_pool; + amal_thread_pool_thread *thread_pool_thread; + amal_thread_job_callback callback; + void *userdata; +} amal_thread_pool_callback_data; + +/* + If @num_threads is 0, then the number of threads selected will the number + of physical threads, or 1 +*/ +CHECK_RESULT int thread_pool_init(amal_thread_pool *self, int num_threads); +void thread_pool_deinit(amal_thread_pool *self); +/* + Thread-safe. Will fail if one task has failed, in which case it can only be used + once @thread_pool_join_all_tasks has been called +*/ +CHECK_RESULT int thread_pool_add_task(amal_thread_pool *self, amal_thread_job_callback callback, void *userdata); +/* + Wait until all tasks have finished. Should only be called from one thread. + Returns true if all tasks finished without any issues (if @thread_pool_mark_dead was not called) +*/ +CHECK_RESULT bool thread_pool_join_all_tasks(amal_thread_pool *self); +/* + Stop tasks as soon as possible and stop dispatch of new tasks. + This can be used when work on one thread has failed and you want to + stop tasks on all threads to report errors and stop additional tasks. +*/ +void thread_pool_mark_dead(amal_thread_pool *self); + +BufferView/**/ thread_pool_get_threads(amal_thread_pool *self); + +#endif diff --git a/src/ast.c b/src/ast.c index 1a44ce1..cb4d875 100644 --- a/src/ast.c +++ b/src/ast.c @@ -421,13 +421,10 @@ int scope_add_child(Scope *self, Ast *child) { } void scope_resolve(Scope *self, AstCompilerContext *context) { - Ast **ast; - Ast **ast_end; - Scope *prev_scope; + Ast **ast = buffer_begin(&self->ast_objects); + Ast **ast_end = buffer_end(&self->ast_objects); + Scope *prev_scope = context->scope; - ast = buffer_begin(&self->ast_objects); - ast_end = buffer_end(&self->ast_objects); - prev_scope = context->scope; context->scope = self; for(; ast != ast_end; ++ast) { ast_resolve(*ast, context); diff --git a/src/compiler.c b/src/compiler.c index 803515b..fd9e4ae 100644 --- a/src/compiler.c +++ b/src/compiler.c @@ -25,37 +25,40 @@ static usize strnlen(const char *str, usize max_length) { } /* TODO: Allow to specify size and members? */ -static CHECK_RESULT int create_default_type(amal_compiler *compiler, const char *name, LhsExpr **lhs_expr) { +static CHECK_RESULT int create_default_type(amal_compiler *compiler, const char *name, amal_default_type **default_type) { StructDecl *struct_decl; Ast *expr; + LhsExpr *lhs_expr; return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(StructDecl), (void**)&struct_decl)); return_if_error(structdecl_init(struct_decl, &compiler->root_scope, &compiler->allocator)); - return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(LhsExpr), (void**)lhs_expr)); - lhsexpr_init(*lhs_expr, DECL_FLAG_EXTERN | DECL_FLAG_PUB | DECL_FLAG_CONST, create_buffer_view(name, strnlen(name, PATH_MAX))); - return_if_error(ast_create(&compiler->allocator, struct_decl, AST_STRUCT_DECL, &(*lhs_expr)->rhs_expr)); - return_if_error(ast_create(&compiler->allocator, *lhs_expr, AST_LHS, &expr)); + return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(amal_default_type), (void**)default_type)); + lhs_expr = &(*default_type)->lhs_expr; + + lhsexpr_init(lhs_expr, DECL_FLAG_EXTERN | DECL_FLAG_PUB | DECL_FLAG_CONST, create_buffer_view(name, strnlen(name, PATH_MAX))); + return_if_error(ast_create(&compiler->allocator, struct_decl, AST_STRUCT_DECL, &lhs_expr->rhs_expr)); + return_if_error(ast_create(&compiler->allocator, lhs_expr, AST_LHS, &expr)); expr->resolve_data.type.type = RESOLVED_TYPE_LHS_EXPR; - expr->resolve_data.type.value.lhs_expr = *lhs_expr; + expr->resolve_data.type.value.lhs_expr = lhs_expr; expr->resolve_data.status = AST_RESOLVED; return scope_add_child(&compiler->root_scope, expr); } static CHECK_RESULT int init_default_types(amal_compiler *compiler) { - return_if_error(create_default_type(compiler, "i8", (LhsExpr**)&compiler->default_types.i8)); - return_if_error(create_default_type(compiler, "i16", (LhsExpr**)&compiler->default_types.i16)); - return_if_error(create_default_type(compiler, "i32", (LhsExpr**)&compiler->default_types.i32)); - return_if_error(create_default_type(compiler, "i64", (LhsExpr**)&compiler->default_types.i64)); - return_if_error(create_default_type(compiler, "u8", (LhsExpr**)&compiler->default_types.u8)); - return_if_error(create_default_type(compiler, "u16", (LhsExpr**)&compiler->default_types.u16)); - return_if_error(create_default_type(compiler, "u32", (LhsExpr**)&compiler->default_types.u32)); - return_if_error(create_default_type(compiler, "u64", (LhsExpr**)&compiler->default_types.u64)); - return_if_error(create_default_type(compiler, "isize", (LhsExpr**)&compiler->default_types.isize)); - return_if_error(create_default_type(compiler, "usize", (LhsExpr**)&compiler->default_types.usize)); - return_if_error(create_default_type(compiler, "f32", (LhsExpr**)&compiler->default_types.f32)); - return_if_error(create_default_type(compiler, "f64", (LhsExpr**)&compiler->default_types.f64)); - return_if_error(create_default_type(compiler, "str", (LhsExpr**)&compiler->default_types.str)); + return_if_error(create_default_type(compiler, "i8", &compiler->default_types.i8)); + return_if_error(create_default_type(compiler, "i16", &compiler->default_types.i16)); + return_if_error(create_default_type(compiler, "i32", &compiler->default_types.i32)); + return_if_error(create_default_type(compiler, "i64", &compiler->default_types.i64)); + return_if_error(create_default_type(compiler, "u8", &compiler->default_types.u8)); + return_if_error(create_default_type(compiler, "u16", &compiler->default_types.u16)); + return_if_error(create_default_type(compiler, "u32", &compiler->default_types.u32)); + return_if_error(create_default_type(compiler, "u64", &compiler->default_types.u64)); + return_if_error(create_default_type(compiler, "isize", &compiler->default_types.isize)); + return_if_error(create_default_type(compiler, "usize", &compiler->default_types.usize)); + return_if_error(create_default_type(compiler, "f32", &compiler->default_types.f32)); + return_if_error(create_default_type(compiler, "f64", &compiler->default_types.f64)); + return_if_error(create_default_type(compiler, "str", &compiler->default_types.str)); compiler->default_types.arithmetic_types[0] = compiler->default_types.i8; compiler->default_types.arithmetic_types[1] = compiler->default_types.u8; @@ -86,11 +89,10 @@ static CHECK_RESULT int init_default_types(amal_compiler *compiler) { bool is_arithmetic_type(LhsExpr *expr, amal_compiler *compiler) { usize i; - const amal_default_types *default_types; - default_types = &compiler->default_types; + const amal_default_types *default_types = &compiler->default_types; for(i = 0; i < NUM_ARITHMETIC_TYPES; ++i) { - if(expr == (LhsExpr*)default_types->arithmetic_types[i]) + if(expr == &default_types->arithmetic_types[i]->lhs_expr) return bool_true; } return bool_false; @@ -103,19 +105,6 @@ void amal_compiler_options_init(amal_compiler_options *self) { } static CHECK_RESULT int amal_compiler_init(amal_compiler *self, const amal_compiler_options *options, amal_program *program) { - int i; - - self->usable_thread_count = options ? options->num_threads : 0; - if(self->usable_thread_count == 0) { - self->usable_thread_count = amal_get_usable_thread_count(); - if(self->usable_thread_count == 0) { - amal_log_warning("Unable to get the number of threads available on the system, using 1 thread."); - amal_log_warning("You can override the number of threads used by setting compiler option for number of thread to use."); - self->usable_thread_count = 1; - } - } - amal_log_info("Using %d threads", self->usable_thread_count); - am_memset(&self->allocator, 0, sizeof(self->allocator)); am_memset(&self->root_scope, 0, sizeof(self->root_scope)); if(options) @@ -124,20 +113,13 @@ static CHECK_RESULT int amal_compiler_init(amal_compiler *self, const amal_compi amal_compiler_options_init(&self->options); self->program = program; self->started = bool_false; - self->work_failed = bool_false; - self->generic_work_object_index = 0; - amal_mutex_init(&self->mutex); + return_if_error(thread_pool_init(&self->stage_task_thread_pool, options ? options->num_threads : 0)); + return_if_error(amal_mutex_init(&self->mutex)); return_if_error(arena_allocator_init(&self->allocator)); cleanup_if_error(scope_init(&self->root_scope, NULL, &self->allocator)); cleanup_if_error(buffer_init(&self->parsers, &self->allocator)); - cleanup_if_error(buffer_init(&self->queued_files, &self->allocator)); cleanup_if_error(hash_map_init(&self->file_scopes, &self->allocator, sizeof(FileScopeReference*), hash_map_compare_string, amal_hash_string)); - cleanup_if_error(arena_allocator_alloc(&self->allocator, - self->usable_thread_count * sizeof(ParserThreadData), - (void**)&self->threads)); - for(i = 0; i < self->usable_thread_count; ++i) - cleanup_if_error(parser_thread_data_init(&self->threads[i])); cleanup_if_error(init_default_types(self)); return AMAL_COMPILER_OK; @@ -147,11 +129,13 @@ static CHECK_RESULT int amal_compiler_init(amal_compiler *self, const amal_compi } void amal_compiler_deinit(amal_compiler *self) { - int i; - for(i = 0; i < self->usable_thread_count; ++i) { - parser_thread_data_deinit(&self->threads[i]); + Parser **parser = buffer_begin(&self->parsers); + Parser **parser_end = buffer_end(&self->parsers); + for(; parser != parser_end; ++parser) { + if((*parser)->allocator) + arena_allocator_deinit((*parser)->allocator); } - + thread_pool_deinit(&self->stage_task_thread_pool); amal_mutex_deinit(&self->mutex); arena_allocator_deinit(&self->allocator); } @@ -165,13 +149,11 @@ typedef enum { typedef struct { amal_compiler *compiler; - ParserThreadData *parser_thread_data; FileScopeReference *file_scope; } CompilerParserThreadUserData; typedef struct { amal_compiler *compiler; - ParserThreadData *parser_thread_data; Parser *parser; ThreadWorkType work_type; } CompilerGenericThreadUserData; @@ -184,19 +166,25 @@ typedef struct { ThreadWorkType type; } ThreadWorkData; -static CHECK_RESULT int amal_compiler_load_in_this_thread(amal_compiler *compiler, FileScopeReference *file_scope, ArenaAllocator *allocator) { +static CHECK_RESULT int amal_compiler_load_in_this_thread(amal_compiler *compiler, FileScopeReference *file_scope) { Parser *parser; int result; BufferView filepath; + ArenaAllocator *parser_allocator; result = AMAL_COMPILER_ERR; + cleanup_if_error(amal_mutex_lock(&compiler->mutex, "amal_compiler_load_in_this_thread, create arena allocator")); + return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(ArenaAllocator), (void**)&parser_allocator)); + amal_mutex_tryunlock(&compiler->mutex); + return_if_error(arena_allocator_init(parser_allocator)); + filepath = create_buffer_view(file_scope->canonical_path.data, file_scope->canonical_path.size); amal_log_info("Started parsing %.*s", filepath.size, filepath.data); - return_if_error(arena_allocator_alloc(allocator, sizeof(Parser), (void**)&parser)); - return_if_error(parser_init(parser, compiler, allocator)); + return_if_error(arena_allocator_alloc(parser_allocator, sizeof(Parser), (void**)&parser)); + return_if_error(parser_init(parser, compiler, parser_allocator)); file_scope->parser = parser; return_if_error(parser_parse_file(parser, filepath)); - cleanup_if_error(amal_mutex_lock(&compiler->mutex, "amal_compiler_load_in_this_thread")); + cleanup_if_error(amal_mutex_lock(&compiler->mutex, "amal_compiler_load_in_this_thread, add parser")); cleanup_if_error(buffer_append(&compiler->parsers, &parser, sizeof(parser))); amal_log_info("Finished parsing %.*s", filepath.size, filepath.data); result = AMAL_COMPILER_OK; @@ -206,51 +194,12 @@ static CHECK_RESULT int amal_compiler_load_in_this_thread(amal_compiler *compile return result; } -static void* thread_callback_parse_file(void *userdata) { - FileScopeReference *file_scope; - CompilerParserThreadUserData compiler_parser_userdata; - void *result; - assert(!amal_thread_is_main()); - - am_memcpy(&compiler_parser_userdata, userdata, sizeof(compiler_parser_userdata)); +static int thread_callback_parse_file(void *userdata) { + int result; + CompilerParserThreadUserData *compiler_parser_userdata = userdata; + result = amal_compiler_load_in_this_thread(compiler_parser_userdata->compiler, + compiler_parser_userdata->file_scope); am_free(userdata); - file_scope = compiler_parser_userdata.file_scope; - result = (void*)AMAL_COMPILER_ERR; - for(;;) { - int has_next; - /* Abort job if another job failed */ - if(compiler_parser_userdata.compiler->work_failed) { - result = (void*)AMAL_COMPILER_WORK_FAIL_ABORT; - goto cleanup; - } - cleanup_if_error(amal_compiler_load_in_this_thread(compiler_parser_userdata.compiler, - file_scope, - &compiler_parser_userdata.parser_thread_data->allocator)); - cleanup_if_error(amal_mutex_lock(&compiler_parser_userdata.compiler->mutex, "thread_callback_parse_file")); - has_next = buffer_pop(&compiler_parser_userdata.compiler->queued_files, &file_scope, sizeof(FileScopeReference*)); - amal_mutex_tryunlock(&compiler_parser_userdata.compiler->mutex); - if(has_next != 0) - break; - } - result = NULL; - - cleanup: - /* - To stop all other parsers from working cleanly, we simply clear the file queue, - and the other threads will stop when they are done with the file they are currently parsing. - */ - if(result != NULL) { - ignore_result_int(amal_mutex_lock(&compiler_parser_userdata.compiler->mutex, "thread_callback_parse_file")); - buffer_clear(&compiler_parser_userdata.compiler->queued_files); - } - /* - There can be a data race between writing to this and when this is read in @amal_compiler_load_file_join_threads. - This is intentional and it's ok, because the join_threads function checks against this status is guaranteed at this point - to not be a certain value that it checks for. Writing to an int is atomic. - TODO: Verify this is ok on all platforms. - */ - compiler_parser_userdata.parser_thread_data->status = PARSER_THREAD_STATUS_IDLE; - amal_mutex_tryunlock(&compiler_parser_userdata.compiler->mutex); return result; } @@ -302,195 +251,72 @@ static CHECK_RESULT int thread_generate_bytecode(Parser *parser) { return result; } -/* TODO: Handle errors (stop work in all other threads and report errors/warnings) */ -static void* thread_callback_generic(void *userdata) { - CompilerGenericThreadUserData compiler_userdata; - Parser *parser; - void *result; - assert(!amal_thread_is_main()); - - am_memcpy(&compiler_userdata, userdata, sizeof(compiler_userdata)); - am_free(userdata); - parser = compiler_userdata.parser; - result = (void*)AMAL_COMPILER_ERR; - for(;;) { - /* TODO: stop work in all other threads on failure */ - switch(compiler_userdata.work_type) { - case THREAD_WORK_PARSE: { - assert(bool_false && "Thread work type can't ge 'parse' for generic work"); - break; - } - case THREAD_WORK_RESOLVE_AST: - cleanup_if_error(thread_resolve_ast(compiler_userdata.compiler, parser)); - break; - case THREAD_WORK_GENERATE_SSA: - cleanup_if_error(thread_generate_ssa(parser)); - break; - case THREAD_WORK_GENERATE_BYTECODE: - cleanup_if_error(thread_generate_bytecode(parser)); - break; - } - - /* Abort job if another job failed */ - if(compiler_userdata.compiler->work_failed) { - result = (void*)AMAL_COMPILER_WORK_FAIL_ABORT; - goto cleanup; +static int thread_callback_generic(void *userdata) { + int result; + CompilerGenericThreadUserData *compiler_userdata = userdata; + switch(compiler_userdata->work_type) { + case THREAD_WORK_PARSE: { + assert(bool_false && "Thread work type can't be 'parse' for generic work"); + break; } - - /* Find next job */ - cleanup_if_error(amal_mutex_lock(&compiler_userdata.compiler->mutex, "thread_callback_generic")); - if(compiler_userdata.compiler->generic_work_object_index + 1 >= (int)buffer_get_size(&compiler_userdata.compiler->parsers, Parser*)) + case THREAD_WORK_RESOLVE_AST: + result = thread_resolve_ast(compiler_userdata->compiler, compiler_userdata->parser); + break; + case THREAD_WORK_GENERATE_SSA: + result = thread_generate_ssa(compiler_userdata->parser); + break; + case THREAD_WORK_GENERATE_BYTECODE: + result = thread_generate_bytecode(compiler_userdata->parser); break; - ++compiler_userdata.compiler->generic_work_object_index; - parser = *(Parser**)buffer_get(&compiler_userdata.compiler->parsers, compiler_userdata.compiler->generic_work_object_index, sizeof(parser)); - amal_mutex_tryunlock(&compiler_userdata.compiler->mutex); - } - result = NULL; - - cleanup: - /* - To stop all other worker threads cleanly, we simply say we are done with all work in the queue, - and the other threads will stop when they are done with the work they are currently working on. - */ - if(result != NULL) { - ignore_result_int(amal_mutex_lock(&compiler_userdata.compiler->mutex, "thread_callback_generic")); - compiler_userdata.compiler->generic_work_object_index = (int)buffer_get_size(&compiler_userdata.compiler->parsers, Parser*); } - compiler_userdata.parser_thread_data->status = PARSER_THREAD_STATUS_IDLE; - amal_mutex_tryunlock(&compiler_userdata.compiler->mutex); + am_free(userdata); return result; } -static CHECK_RESULT int amal_compiler_select_thread_for_work(amal_compiler *self, ThreadWorkData work_data, ParserThreadData **thread_selected) { - int i; - int result; - ParserThreadData *parser_thread_data; - void *thread_user_data; - thread_user_data = NULL; - *thread_selected = NULL; - result = AMAL_COMPILER_OK; - - cleanup_if_error(amal_mutex_lock(&self->mutex, "amal_compiler_select_thread_for_work")); - for(i = 0; i < self->usable_thread_count; ++i) { - parser_thread_data = &self->threads[i]; - if(parser_thread_data->status == PARSER_THREAD_STATUS_RUNNING) - continue; - - switch(work_data.type) { - case THREAD_WORK_PARSE: { - CompilerParserThreadUserData *userdata; - cleanup_if_error(am_malloc(sizeof(CompilerParserThreadUserData), (void**)&userdata)); - thread_user_data = userdata; - userdata->compiler = self; - userdata->parser_thread_data = parser_thread_data; - userdata->file_scope = work_data.value.file_scope; - result = parser_thread_data_start(parser_thread_data, thread_callback_parse_file, userdata); - break; - } - case THREAD_WORK_RESOLVE_AST: - case THREAD_WORK_GENERATE_SSA: - case THREAD_WORK_GENERATE_BYTECODE: { - CompilerGenericThreadUserData *userdata; - cleanup_if_error(am_malloc(sizeof(CompilerGenericThreadUserData), (void**)&userdata)); - thread_user_data = userdata; - userdata->compiler = self; - userdata->parser_thread_data = parser_thread_data; - userdata->parser = work_data.value.parser; - userdata->work_type = work_data.type; - ++self->generic_work_object_index; - result = parser_thread_data_start(parser_thread_data, thread_callback_generic, userdata); - break; - } +static CHECK_RESULT int amal_compiler_add_task(amal_compiler *self, ThreadWorkData work_data) { + void *thread_user_data = NULL; + int result = AMAL_COMPILER_OK; + + switch(work_data.type) { + case THREAD_WORK_PARSE: { + CompilerParserThreadUserData *userdata; + cleanup_if_error(am_malloc(sizeof(CompilerParserThreadUserData), (void**)&userdata)); + thread_user_data = userdata; + userdata->compiler = self; + userdata->file_scope = work_data.value.file_scope; + result = thread_pool_add_task(&self->stage_task_thread_pool, thread_callback_parse_file, userdata); + break; + } + case THREAD_WORK_RESOLVE_AST: + case THREAD_WORK_GENERATE_SSA: + case THREAD_WORK_GENERATE_BYTECODE: { + CompilerGenericThreadUserData *userdata; + cleanup_if_error(am_malloc(sizeof(CompilerGenericThreadUserData), (void**)&userdata)); + thread_user_data = userdata; + userdata->compiler = self; + userdata->parser = work_data.value.parser; + userdata->work_type = work_data.type; + result = thread_pool_add_task(&self->stage_task_thread_pool, thread_callback_generic, userdata); + break; } - *thread_selected = parser_thread_data; - break; } cleanup: if(result != 0) am_free(thread_user_data); - amal_mutex_tryunlock(&self->mutex); - return result; -} - -static CHECK_RESULT bool amal_compiler_check_all_threads_done(amal_compiler *self) { - int i; - bool result; - result = bool_false; - - cleanup_if_error(amal_mutex_lock(&self->mutex, "amal_compiler_check_all_threads_done")); - for(i = 0; i < self->usable_thread_count; ++i) { - ParserThreadData *parser_thread_data; - parser_thread_data = &self->threads[i]; - if(parser_thread_data->status == PARSER_THREAD_STATUS_RUNNING) { - goto cleanup; - } - } - - result = bool_true; - cleanup: - amal_mutex_tryunlock(&self->mutex); - return result; -} - -static CHECK_RESULT int amal_compiler_load_file_join_threads(amal_compiler *self) { - int i; - int result; - void *thread_return_data; - ParserThreadData *parser_thread_data; - - assert(amal_thread_is_main()); - thread_return_data = NULL; - for(;;) { - bool done; - /* - Joining running threads. After checking one running thread another one might start up, - so this is mostly to wait for threads to finish and to sleep without doing work. - The check after that (amal_compiler_all_threads_done) check that all threads are done correctly - */ - for(i = 0; i < self->usable_thread_count; ++i) { - result = amal_mutex_lock(&self->mutex, "amal_compiler_load_file_join_threads, waiting for workers"); - parser_thread_data = &self->threads[i]; - amal_mutex_tryunlock(&self->mutex); - if(result != 0) - goto cleanup; - /* TODO: What to do if join fails? */ - ignore_result_int(parser_thread_data_join(parser_thread_data, &thread_return_data)); - if(thread_return_data != NULL) { - amal_log_error("Failed, waiting for jobs to finish"); - self->work_failed = bool_true; - } - } - - done = amal_compiler_check_all_threads_done(self); - if(done) - break; - } - - result = AMAL_COMPILER_OK; - cleanup: - if(self->work_failed) - result = AMAL_COMPILER_ERR; return result; } static CHECK_RESULT int amal_compiler_dispatch_generic(amal_compiler *self, ThreadWorkType work_type) { - Parser **parser; - Parser **parser_end; - parser = buffer_begin(&self->parsers); - parser_end = buffer_end(&self->parsers); - self->generic_work_object_index = 0; + Parser **parser = buffer_begin(&self->parsers); + Parser **parser_end = buffer_end(&self->parsers); for(; parser != parser_end; ++parser) { - ParserThreadData *thread_selected; ThreadWorkData thread_work_data; thread_work_data.type = work_type; thread_work_data.value.parser = *parser; - return_if_error(amal_compiler_select_thread_for_work(self, thread_work_data, &thread_selected)); - /* After all threads have been used, they will handle using the remaining parsers or stop if there is an error */ - if(!thread_selected) - break; + return_if_error(amal_compiler_add_task(self, thread_work_data)); } - return amal_compiler_load_file_join_threads(self); + return thread_pool_join_all_tasks(&self->stage_task_thread_pool) ? 0 : -1; } static CHECK_RESULT int amal_compiler_generate_program(amal_compiler *self) { @@ -498,10 +324,8 @@ static CHECK_RESULT int amal_compiler_generate_program(amal_compiler *self) { TODO: Copying the bytecode to the program can be done using multiple threads. Use self->threads for that. */ - Parser **parser; - Parser **parser_end; - parser = buffer_begin(&self->parsers); - parser_end = buffer_end(&self->parsers); + Parser **parser = buffer_begin(&self->parsers); + Parser **parser_end = buffer_end(&self->parsers); for(; parser != parser_end; ++parser) { return_if_error(amal_program_append_bytecode(self->program, &(*parser)->bytecode)); } @@ -580,15 +404,10 @@ static CHECK_RESULT int validate_main_func(FileScopeReference *main_file_scope, } int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, FileScopeReference **file_scope) { - int result; - ParserThreadData *parser_thread_data; ThreadWorkData thread_work_data; bool main_job; bool new_entry; - if(self->work_failed) - return AMAL_COMPILER_WORK_FAIL_ABORT; - return_if_error(try_create_file_scope(self, filepath, file_scope, &new_entry)); assert(file_scope && *file_scope && (*file_scope)->canonical_path.data); if(!new_entry) { @@ -596,7 +415,6 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, return 0; } - result = AMAL_COMPILER_ERR; thread_work_data.type = THREAD_WORK_PARSE; thread_work_data.value.file_scope = *file_scope; main_job = bool_false; @@ -607,7 +425,7 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, main_job = bool_true; } - return_if_error(amal_compiler_select_thread_for_work(self, thread_work_data, &parser_thread_data)); + return_if_error(amal_compiler_add_task(self, thread_work_data)); if(main_job) { /*doc(Compiler flow) @@ -619,8 +437,8 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, */ LhsExpr *main_func; - return_if_error(amal_compiler_load_file_join_threads(self)); - assert(amal_compiler_check_all_threads_done(self)); + if(!thread_pool_join_all_tasks(&self->stage_task_thread_pool)) + return -1; amal_log_info("Finished parsing all files, resolving AST"); return_if_error(validate_main_func(*file_scope, &main_func)); @@ -631,15 +449,12 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, main_func->decl_flags |= DECL_FLAG_EXPORT; return_if_error(amal_compiler_dispatch_generic(self, THREAD_WORK_RESOLVE_AST)); - assert(amal_compiler_check_all_threads_done(self)); amal_log_info("Finished resolving AST, generating SSA"); return_if_error(amal_compiler_dispatch_generic(self, THREAD_WORK_GENERATE_SSA)); - assert(amal_compiler_check_all_threads_done(self)); amal_log_info("Finished generating SSA"); return_if_error(amal_compiler_dispatch_generic(self, THREAD_WORK_GENERATE_BYTECODE)); - assert(amal_compiler_check_all_threads_done(self)); amal_log_info("Finished generating bytecode"); return_if_error(amal_compiler_generate_program(self)); @@ -648,14 +463,5 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, return AMAL_COMPILER_OK; } - if(parser_thread_data) - return AMAL_COMPILER_OK; - - cleanup_if_error(amal_mutex_lock(&self->mutex, "amal_compiler_load_file")); - cleanup_if_error(buffer_append(&self->queued_files, file_scope, sizeof(FileScopeReference*))); - result = AMAL_COMPILER_OK; - - cleanup: - amal_mutex_tryunlock(&self->mutex); - return result; + return 0; } diff --git a/src/parser.c b/src/parser.c index 03b835e..85165d7 100644 --- a/src/parser.c +++ b/src/parser.c @@ -30,31 +30,6 @@ static void parser_parse_var_type(Parser *self, VariableType *result); static void parser_parse_var_type_def(Parser *self, VariableType *result); static void parser_queue_file(Parser *self, BufferView path, FileScopeReference **file_scope); -int parser_thread_data_init(ParserThreadData *self) { - am_memset(&self->allocator, 0, sizeof(self->allocator)); - am_memset(&self->thread, 0, sizeof(self->thread)); - self->status = PARSER_THREAD_STATUS_NEW; - return arena_allocator_init(&self->allocator); -} - -void parser_thread_data_deinit(ParserThreadData *self) { - ignore_result_int(amal_thread_deinit(&self->thread)); - arena_allocator_deinit(&self->allocator); -} - -int parser_thread_data_start(ParserThreadData *self, AmalThreadCallbackFunc callback_func, void *userdata) { - return_if_error(amal_thread_deinit(&self->thread)); - return_if_error(amal_thread_create(&self->thread, AMAL_THREAD_JOINABLE, "Parser", callback_func, userdata)); - self->status = PARSER_THREAD_STATUS_RUNNING; - return 0; -} - -int parser_thread_data_join(ParserThreadData *self, void **result) { - if(self->status == PARSER_THREAD_STATUS_NEW) - return 0; - return amal_thread_join(&self->thread, result); -} - int parser_init(Parser *self, amal_compiler *compiler, ArenaAllocator *allocator) { self->allocator = allocator; self->compiler = compiler; diff --git a/src/program.c b/src/program.c index 17aee03..082f9fd 100644 --- a/src/program.c +++ b/src/program.c @@ -216,6 +216,8 @@ static CHECK_RESULT int amal_program_read_strings(amal_program *self) { if(bytes_left_to_read(self) < strings_size) return AMAL_PROGRAM_INVALID_STRINGS_SIZE; + am_free(self->string_indices); + self->string_indices = NULL; if(am_malloc(sizeof(u32) * self->num_strings, (void**)&self->string_indices) != 0) return AMAL_PROGRAM_ALLOC_FAILURE; string_index_ptr = self->string_indices; @@ -266,6 +268,8 @@ static CHECK_RESULT int amal_program_read_external_functions(amal_program *self) if(bytes_left_to_read(self) < extern_funcs_size) return AMAL_PROGRAM_INVALID_EXTERNAL_FUNCTIONS_SIZE; + am_free(self->extern_func_indices); + self->extern_func_indices = NULL; if(am_malloc(sizeof(u32) * self->num_extern_functions, (void**)&self->extern_func_indices) != 0) return AMAL_PROGRAM_ALLOC_FAILURE; extern_func_index_ptr = self->extern_func_indices; diff --git a/src/ssa/ssa.c b/src/ssa/ssa.c index 39b5a80..500555a 100644 --- a/src/ssa/ssa.c +++ b/src/ssa/ssa.c @@ -551,10 +551,8 @@ static CHECK_RESULT SsaRegister funccall_generate_ssa(FunctionCall *self, AstRes LhsExpr *func_lhs_expr; { - Ast **arg; - Ast **arg_end; - arg = buffer_begin(&self->args); - arg_end = buffer_end(&self->args); + Ast **arg = buffer_begin(&self->args); + Ast **arg_end = buffer_end(&self->args); for(; arg != arg_end; ++arg) { SsaRegister arg_reg; arg_reg = ast_generate_ssa(*arg, context); @@ -781,10 +779,8 @@ CHECK_RESULT SsaRegister scope_named_object_generate_ssa(ScopeNamedObject *self, } void scope_generate_ssa(Scope *self, SsaCompilerContext *context) { - Ast **ast; - Ast **ast_end; - ast = buffer_begin(&self->ast_objects); - ast_end = buffer_end(&self->ast_objects); + Ast **ast = buffer_begin(&self->ast_objects); + Ast **ast_end = buffer_end(&self->ast_objects); for(; ast != ast_end; ++ast) { ignore_result_int(ast_generate_ssa(*ast, context)); } diff --git a/src/std/alloc.c b/src/std/alloc.c index d07e94f..44a5105 100644 --- a/src/std/alloc.c +++ b/src/std/alloc.c @@ -15,7 +15,7 @@ int am_malloc(usize size, void **mem) { int am_realloc(void *mem, usize new_size, void **new_mem) { void *new_allocated_data = realloc(mem, new_size); - if(!new_allocated_data) { + if(!new_allocated_data && new_size != 0) { amal_log_error("am_malloc: failed to reallocate memory to size %lu", new_size); return ALLOC_FAIL; } diff --git a/src/std/arena_allocator.c b/src/std/arena_allocator.c index 4934925..11fb40d 100644 --- a/src/std/arena_allocator.c +++ b/src/std/arena_allocator.c @@ -32,10 +32,8 @@ int arena_allocator_init(ArenaAllocator *self) { } static void arena_allocator_deinit_buffers(ArenaAllocator *self) { - void **mem; - void **mems_end; - mem = buffer_begin(&self->mems); - mems_end = buffer_end(&self->mems); + void **mem = buffer_begin(&self->mems); + void **mems_end = buffer_end(&self->mems); while(mem != mems_end) { am_free(*mem); ++mem; diff --git a/src/std/log.c b/src/std/log.c index 88ff0cf..59e0319 100644 --- a/src/std/log.c +++ b/src/std/log.c @@ -12,7 +12,7 @@ static bool mutex_initialized = bool_false; /* Safe to call multiple times */ static void mutex_init() { if(!mutex_initialized) { - amal_mutex_init(&mutex); + ignore_result_int(amal_mutex_init(&mutex)); mutex_initialized = bool_true; } } diff --git a/src/std/thread.c b/src/std/thread.c index fb0e9ab..87362d2 100644 --- a/src/std/thread.c +++ b/src/std/thread.c @@ -107,13 +107,13 @@ int amal_thread_join(amal_thread *self, void **result) { return 0; } -void amal_mutex_init(amal_mutex *self) { - pthread_mutex_init(&self->mutex, NULL); +int amal_mutex_init(amal_mutex *self) { #ifdef AMAL_MUTEX_DEBUG self->lock_identifier = NULL; #endif self->locked = bool_false; self->owner_thread = 0; + return pthread_mutex_init(&self->mutex, NULL); } void amal_mutex_deinit(amal_mutex *self) { diff --git a/src/std/thread_pool.c b/src/std/thread_pool.c new file mode 100644 index 0000000..8f6e180 --- /dev/null +++ b/src/std/thread_pool.c @@ -0,0 +1,187 @@ +#include "../../include/std/thread_pool.h" +#include "../../include/std/alloc.h" +#include "../../include/std/mem.h" +#include "../../include/std/log.h" + +/* Sets @result to NULL if there are no available tasks */ +static CHECK_RESULT int thread_pool_take_task(amal_thread_pool *self, amal_thread_pool_task **result) { + *result = NULL; + cleanup_if_error(amal_mutex_lock(&self->task_select_mutex, "thread_pool_take_task")); + if(self->num_finished_queued_tasks < (int)buffer_get_size(&self->queued_tasks, amal_thread_pool_task) && !self->dead) + *result = buffer_get(&self->queued_tasks, self->num_finished_queued_tasks, sizeof(amal_thread_pool_task)); + cleanup: + amal_mutex_tryunlock(&self->task_select_mutex); + return 0; +} + +static void* thread_pool_thread_callback(void *userdata) { + amal_thread_pool_callback_data *thread_pool_data = userdata; + if(thread_pool_data->thread_pool->dead) + goto cleanup; + + if(thread_pool_data->callback(thread_pool_data->userdata) != 0) { + thread_pool_mark_dead(thread_pool_data->thread_pool); + goto cleanup; + } + + for(;;) { + amal_thread_pool_task *new_task; + cleanup_if_error(thread_pool_take_task(thread_pool_data->thread_pool, &new_task)); + if(!new_task) + break; + + if(new_task->callback(new_task->userdata) != 0) { + thread_pool_mark_dead(thread_pool_data->thread_pool); + goto cleanup; + } + } + + cleanup: + thread_pool_data->thread_pool_thread->status = THREAD_POOL_THREAD_STATUS_IDLE; + am_free(thread_pool_data); + return NULL; +} + +static void thread_pool_thread_init(amal_thread_pool_thread *self) { + am_memset(&self->thread, 0, sizeof(self->thread)); + self->status = THREAD_POOL_THREAD_STATUS_NEW; +} + +static void thread_pool_thread_deinit(amal_thread_pool_thread *self) { + ignore_result_int(amal_thread_deinit(&self->thread)); +} + +static CHECK_RESULT int thread_pool_thread_start(amal_thread_pool_thread *self, amal_thread_pool *thread_pool, amal_thread_job_callback callback, void *userdata) { + amal_thread_pool_callback_data *callback_data; + return_if_error(am_malloc(sizeof(amal_thread_pool_callback_data), (void**)&callback_data)); + callback_data->thread_pool = thread_pool; + callback_data->thread_pool_thread = self; + callback_data->callback = callback; + callback_data->userdata = userdata; + + return_if_error(amal_thread_deinit(&self->thread)); + return_if_error(amal_thread_create(&self->thread, AMAL_THREAD_JOINABLE, "thread_pool_thread_start", thread_pool_thread_callback, callback_data)); + self->status = THREAD_POOL_THREAD_STATUS_RUNNING; + return 0; +} + +static CHECK_RESULT int thread_pool_thread_join(amal_thread_pool_thread *self) { + if(self->status == THREAD_POOL_THREAD_STATUS_NEW) + return 0; + return amal_thread_join(&self->thread, NULL); +} + +int thread_pool_init(amal_thread_pool *self, int num_threads) { + int i; + self->num_threads = num_threads != 0 ? num_threads : amal_get_usable_thread_count(); + if(self->num_threads == 0) { + amal_log_warning("Unable to get the number of threads available on the system, using 1 thread."); + self->num_threads = 1; + } + + self->dead = bool_false; + self->num_finished_queued_tasks = 0; + self->threads = NULL; + + ignore_result_int(buffer_init(&self->queued_tasks, NULL)); + return_if_error(amal_mutex_init(&self->task_select_mutex)); + cleanup_if_error(am_malloc(self->num_threads * sizeof(amal_thread_pool_thread), (void**)&self->threads)); + for(i = 0; i < self->num_threads; ++i) + thread_pool_thread_init(&self->threads[i]); + return 0; + + cleanup: + am_free(self->threads); + self->num_threads = 0; + return -1; +} + +void thread_pool_deinit(amal_thread_pool *self) { + if(self->threads) { + int i; + for(i = 0; i < self->num_threads; ++i) + thread_pool_thread_deinit(&self->threads[i]); + am_free(self->threads); + } + amal_mutex_deinit(&self->task_select_mutex); + buffer_deinit(&self->queued_tasks); +} + +int thread_pool_add_task(amal_thread_pool *self, amal_thread_job_callback callback, void *userdata) { + int i; + bool found_available_thread = bool_false; + int result = -1; + + if(self->dead) + return result; + + cleanup_if_error(amal_mutex_lock(&self->task_select_mutex, "thread_pool_add_task")); + for(i = 0; i < self->num_threads; ++i) { + amal_thread_pool_thread *thread = &self->threads[i]; + if(thread->status != THREAD_POOL_THREAD_STATUS_RUNNING) { + cleanup_if_error(thread_pool_thread_start(thread, self, callback, userdata)); + found_available_thread = bool_true; + break; + } + } + + if(!found_available_thread) { + amal_thread_pool_task task; + task.callback = callback; + task.userdata = userdata; + cleanup_if_error(buffer_append(&self->queued_tasks, &task, sizeof(task))); + } + + result = 0; + cleanup: + amal_mutex_tryunlock(&self->task_select_mutex); + return result; +} + +bool thread_pool_join_all_tasks(amal_thread_pool *self) { + bool died; + for(;;) { + /* + Joining running threads. After checking one running thread another one might start up, + so this is mostly to wait for threads to finish and to sleep without doing work. + The check after that (thread_pool_check_threads_finished) check that all threads have finished correctly + */ + int i; + bool finished = bool_true; + for(i = 0; i < self->num_threads; ++i) { + amal_thread_pool_thread_status thread_status; + if(amal_mutex_lock(&self->task_select_mutex, "thread_pool_join_all_tasks") != 0) + thread_pool_mark_dead(self); + thread_status = self->threads[i].status; + amal_mutex_tryunlock(&self->task_select_mutex); + /* TODO: What to do if join fails? */ + switch(thread_status) { + case THREAD_POOL_THREAD_STATUS_NEW: + break; + case THREAD_POOL_THREAD_STATUS_RUNNING: + finished = bool_false; + /* fallthrough */ + case THREAD_POOL_THREAD_STATUS_IDLE: + ignore_result_int(thread_pool_thread_join(&self->threads[i])); + break; + } + } + + if(finished) + break; + } + + died = self->dead; + self->dead = bool_false; + buffer_clear(&self->queued_tasks); + self->num_finished_queued_tasks = 0; + return !died; +} + +void thread_pool_mark_dead(amal_thread_pool *self) { + self->dead = bool_true; +} + +BufferView thread_pool_get_threads(amal_thread_pool *self) { + return create_buffer_view((const char*)self->threads, self->num_threads * sizeof(amal_thread_pool_thread)); +} diff --git a/tests/bytecode.amal b/tests/bytecode.amal index 0aad6de..a8ea53b 100644 --- a/tests/bytecode.amal +++ b/tests/bytecode.amal @@ -1,3 +1,5 @@ +const b = @import("b.amal"); + extern const print_extern: fn; extern const print_extern_num: fn(num: i32); -- cgit v1.2.3