aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordec05eba <dec05eba@protonmail.com>2019-08-22 00:59:49 +0200
committerdec05eba <dec05eba@protonmail.com>2020-07-25 14:36:46 +0200
commitd6f368a3f400fea3e89280262a8147e7ce5d855c (patch)
tree4eb64eb0d18dad1e40c70a5bff974fe8033fe389
parentdf640dc7f55fef962b598562e10d8dd4d60fedc0 (diff)
Move thread work from compiler/parser to thread_work file, fix use after free bug in multithreaded parser allocator
-rw-r--r--README.md12
-rw-r--r--executor/x86_64/asm.c2
-rw-r--r--include/compiler.h9
-rw-r--r--include/defs.h1
-rw-r--r--include/parser.h17
-rw-r--r--include/std/buffer.h2
-rw-r--r--include/std/thread.h2
-rw-r--r--include/std/thread_pool.h71
-rw-r--r--src/ast.c9
-rw-r--r--src/compiler.c390
-rw-r--r--src/parser.c25
-rw-r--r--src/program.c4
-rw-r--r--src/ssa/ssa.c12
-rw-r--r--src/std/alloc.c2
-rw-r--r--src/std/arena_allocator.c6
-rw-r--r--src/std/log.c2
-rw-r--r--src/std/thread.c4
-rw-r--r--src/std/thread_pool.c187
-rw-r--r--tests/bytecode.amal2
19 files changed, 389 insertions, 370 deletions
diff --git a/README.md b/README.md
index 4a1f0b5..6dc4086 100644
--- a/README.md
+++ b/README.md
@@ -31,10 +31,14 @@ Verify all code execution paths in a function return a value, if the function ex
Show compile error if the result of a function call is ignored.\
Show compile error if function result type and assigned to variable have different types.\
Show compile error if variables are assigned to but not used.\
-Push arguments in reverse order (right-to-left, cdecl) (in program.c, since on windows we will need to support stdcall which is left-to-right).
-## Urgent
-Simplify src/compiler.c, it's pretty complex with the thread work done right now. The thread work should be put in a
-thread dispatch file that only handles thread job dispatching.
+Push arguments in reverse order (right-to-left, cdecl) (in program.c, since on windows we will need to support stdcall which is left-to-right).\
+After tokenizing files, unload file data to disk. Otherwise large programs (as large as chromium) will need gigabytes of ram to compile.
# Documents
Documents are located under doc. The file doc/Documentation.md is generated from source files by running doc/doc_extract.py
but there is no need to run this script unless you are modifying documentation in the source.
+# Development
+Define `AMAL_MUTEX_DEBUG` in src/thread.c to enable mutex debugging.\
+Set `PEDANTIC` environment variable to 1 before compiling to enable pedantic mode.\
+Set `SANITIZE_ADDRESS` environment variable to 1 before compiling to run compile with asan.\
+Set `SANITIZE_THREAD` environment variable to 1 before compiling to run compile with tsan.\
+Set `SCAN_BUILD` environment variable to 1 before compiling to run scan-build on the source files. \ No newline at end of file
diff --git a/executor/x86_64/asm.c b/executor/x86_64/asm.c
index 606e539..c741f15 100644
--- a/executor/x86_64/asm.c
+++ b/executor/x86_64/asm.c
@@ -315,7 +315,7 @@ static void asm_rm(Asm *self, AsmPtr *mem, Reg64 reg) {
}
*self->code_it++ = (reg << 3) | rm_byte;
- /* RSP requires SIB byte when displacement is not 0 */
+ /* RSP requires SIB byte */
if(mem->base == RSP)
*self->code_it++ = 0x24;
}
diff --git a/include/compiler.h b/include/compiler.h
index 08b74b8..ac50d28 100644
--- a/include/compiler.h
+++ b/include/compiler.h
@@ -5,7 +5,7 @@
#include "std/buffer.h"
#include "std/buffer_view.h"
#include "std/arena_allocator.h"
-#include "std/thread.h"
+#include "std/thread_pool.h"
#include "compiler_options.h"
#include "ast.h"
#include "program.h"
@@ -49,20 +49,17 @@ struct amal_compiler {
ArenaAllocator allocator;
Scope root_scope;
Buffer/*<Parser*>*/ parsers;
- Buffer/*<FileScopeReference*>*/ queued_files;
HashMapType(BufferView, FileScopeReference*) file_scopes;
- ParserThreadData *threads;
- int usable_thread_count;
bool started;
- bool work_failed;
amal_mutex mutex;
- int generic_work_object_index;
+ amal_thread_pool stage_task_thread_pool;
};
void amal_compiler_options_init(amal_compiler_options *self);
/*
If @options is NULL, then default values are used.
+ @options are copied.
You should run @amal_program_deinit even @amal_compiler_load_file fails, to cleanup memory.
This function creates a copy of @filepath so it doesn't have to survive longer than this function call.
The file has to be in utf-8 format and it can optionally have utf-8 BOM.
diff --git a/include/defs.h b/include/defs.h
index 8cd9d39..c8db820 100644
--- a/include/defs.h
+++ b/include/defs.h
@@ -1,7 +1,6 @@
#ifndef AMALGAM_DEFS_H
#define AMALGAM_DEFS_H
-typedef struct ParserThreadData ParserThreadData;
typedef struct amal_compiler amal_compiler;
typedef struct Parser Parser;
typedef struct Scope Scope;
diff --git a/include/parser.h b/include/parser.h
index 4427cc8..81895e1 100644
--- a/include/parser.h
+++ b/include/parser.h
@@ -17,18 +17,6 @@
#define PARSER_UNEXPECTED_TOKEN -2
typedef enum {
- PARSER_THREAD_STATUS_NEW,
- PARSER_THREAD_STATUS_IDLE,
- PARSER_THREAD_STATUS_RUNNING
-} ParserThreadStatus;
-
-struct ParserThreadData {
- amal_thread thread;
- ParserThreadStatus status;
- ArenaAllocator allocator;
-};
-
-typedef enum {
ERROR_CONTEXT_NONE,
ERROR_CONTEXT_RHS_STANDALONE,
ERROR_CONTEXT_NO_LHS
@@ -51,11 +39,6 @@ struct Parser {
Bytecode bytecode;
};
-CHECK_RESULT int parser_thread_data_init(ParserThreadData *self);
-void parser_thread_data_deinit(ParserThreadData *self);
-CHECK_RESULT int parser_thread_data_start(ParserThreadData *self, AmalThreadCallbackFunc callback_func, void *userdata);
-CHECK_RESULT int parser_thread_data_join(ParserThreadData *self, void **result);
-
CHECK_RESULT int parser_init(Parser *self, amal_compiler *compiler, ArenaAllocator *allocator);
/*
diff --git a/include/std/buffer.h b/include/std/buffer.h
index ac722b1..d194881 100644
--- a/include/std/buffer.h
+++ b/include/std/buffer.h
@@ -30,7 +30,7 @@ CHECK_RESULT int buffer_append_empty(Buffer *self, usize size);
CHECK_RESULT int buffer_expand(Buffer *self, usize size);
void* buffer_get(Buffer *self, usize index, usize type_size);
CHECK_RESULT int buffer_pop(Buffer *self, void *data, usize size);
-/* Set buffer size to 0, doesn't reallocate */
+/* Set buffer size to 0, doesn't change the capacity */
void buffer_clear(Buffer *self);
void* buffer_begin(Buffer *self);
void* buffer_end(Buffer *self);
diff --git a/include/std/thread.h b/include/std/thread.h
index 356ebf0..2765204 100644
--- a/include/std/thread.h
+++ b/include/std/thread.h
@@ -47,7 +47,7 @@ bool amal_thread_is_main();
/* Returns 0 if the number of usable threads is unknown */
int amal_get_usable_thread_count();
-void amal_mutex_init(amal_mutex *self);
+CHECK_RESULT int amal_mutex_init(amal_mutex *self);
void amal_mutex_deinit(amal_mutex *self);
CHECK_RESULT int amal_mutex_lock(amal_mutex *self, const char *lock_identifier);
/* Safe to call unlock when another thread owns the lock or if the lock is not locked */
diff --git a/include/std/thread_pool.h b/include/std/thread_pool.h
new file mode 100644
index 0000000..f8acf83
--- /dev/null
+++ b/include/std/thread_pool.h
@@ -0,0 +1,71 @@
+#ifndef AMAL_THREAD_POOL_H
+#define AMAL_THREAD_POOL_H
+
+#include "thread.h"
+#include "buffer.h"
+#include "buffer_view.h"
+
+/*
+ Return 0 if there is no error, otherwise return a non-0 value.
+ If the result is not 0, then all additional tasks will be stopped.
+*/
+typedef int(*amal_thread_job_callback)(void *userdata);
+
+typedef enum {
+ THREAD_POOL_THREAD_STATUS_NEW,
+ THREAD_POOL_THREAD_STATUS_IDLE,
+ THREAD_POOL_THREAD_STATUS_RUNNING
+} amal_thread_pool_thread_status;
+
+typedef struct {
+ amal_thread thread;
+ amal_thread_pool_thread_status status;
+} amal_thread_pool_thread;
+
+typedef struct {
+ amal_thread_job_callback callback;
+ void *userdata;
+} amal_thread_pool_task;
+
+typedef struct {
+ int num_threads;
+ amal_thread_pool_thread *threads; /* Size of @threads is @num_threads */
+ amal_mutex task_select_mutex;
+ Buffer queued_tasks;
+ bool dead;
+ int num_finished_queued_tasks;
+} amal_thread_pool;
+
+typedef struct {
+ amal_thread_pool *thread_pool;
+ amal_thread_pool_thread *thread_pool_thread;
+ amal_thread_job_callback callback;
+ void *userdata;
+} amal_thread_pool_callback_data;
+
+/*
+ If @num_threads is 0, then the number of threads selected will the number
+ of physical threads, or 1
+*/
+CHECK_RESULT int thread_pool_init(amal_thread_pool *self, int num_threads);
+void thread_pool_deinit(amal_thread_pool *self);
+/*
+ Thread-safe. Will fail if one task has failed, in which case it can only be used
+ once @thread_pool_join_all_tasks has been called
+*/
+CHECK_RESULT int thread_pool_add_task(amal_thread_pool *self, amal_thread_job_callback callback, void *userdata);
+/*
+ Wait until all tasks have finished. Should only be called from one thread.
+ Returns true if all tasks finished without any issues (if @thread_pool_mark_dead was not called)
+*/
+CHECK_RESULT bool thread_pool_join_all_tasks(amal_thread_pool *self);
+/*
+ Stop tasks as soon as possible and stop dispatch of new tasks.
+ This can be used when work on one thread has failed and you want to
+ stop tasks on all threads to report errors and stop additional tasks.
+*/
+void thread_pool_mark_dead(amal_thread_pool *self);
+
+BufferView/*<amal_thread_pool_thread>*/ thread_pool_get_threads(amal_thread_pool *self);
+
+#endif
diff --git a/src/ast.c b/src/ast.c
index 1a44ce1..cb4d875 100644
--- a/src/ast.c
+++ b/src/ast.c
@@ -421,13 +421,10 @@ int scope_add_child(Scope *self, Ast *child) {
}
void scope_resolve(Scope *self, AstCompilerContext *context) {
- Ast **ast;
- Ast **ast_end;
- Scope *prev_scope;
+ Ast **ast = buffer_begin(&self->ast_objects);
+ Ast **ast_end = buffer_end(&self->ast_objects);
+ Scope *prev_scope = context->scope;
- ast = buffer_begin(&self->ast_objects);
- ast_end = buffer_end(&self->ast_objects);
- prev_scope = context->scope;
context->scope = self;
for(; ast != ast_end; ++ast) {
ast_resolve(*ast, context);
diff --git a/src/compiler.c b/src/compiler.c
index 803515b..fd9e4ae 100644
--- a/src/compiler.c
+++ b/src/compiler.c
@@ -25,37 +25,40 @@ static usize strnlen(const char *str, usize max_length) {
}
/* TODO: Allow to specify size and members? */
-static CHECK_RESULT int create_default_type(amal_compiler *compiler, const char *name, LhsExpr **lhs_expr) {
+static CHECK_RESULT int create_default_type(amal_compiler *compiler, const char *name, amal_default_type **default_type) {
StructDecl *struct_decl;
Ast *expr;
+ LhsExpr *lhs_expr;
return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(StructDecl), (void**)&struct_decl));
return_if_error(structdecl_init(struct_decl, &compiler->root_scope, &compiler->allocator));
- return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(LhsExpr), (void**)lhs_expr));
- lhsexpr_init(*lhs_expr, DECL_FLAG_EXTERN | DECL_FLAG_PUB | DECL_FLAG_CONST, create_buffer_view(name, strnlen(name, PATH_MAX)));
- return_if_error(ast_create(&compiler->allocator, struct_decl, AST_STRUCT_DECL, &(*lhs_expr)->rhs_expr));
- return_if_error(ast_create(&compiler->allocator, *lhs_expr, AST_LHS, &expr));
+ return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(amal_default_type), (void**)default_type));
+ lhs_expr = &(*default_type)->lhs_expr;
+
+ lhsexpr_init(lhs_expr, DECL_FLAG_EXTERN | DECL_FLAG_PUB | DECL_FLAG_CONST, create_buffer_view(name, strnlen(name, PATH_MAX)));
+ return_if_error(ast_create(&compiler->allocator, struct_decl, AST_STRUCT_DECL, &lhs_expr->rhs_expr));
+ return_if_error(ast_create(&compiler->allocator, lhs_expr, AST_LHS, &expr));
expr->resolve_data.type.type = RESOLVED_TYPE_LHS_EXPR;
- expr->resolve_data.type.value.lhs_expr = *lhs_expr;
+ expr->resolve_data.type.value.lhs_expr = lhs_expr;
expr->resolve_data.status = AST_RESOLVED;
return scope_add_child(&compiler->root_scope, expr);
}
static CHECK_RESULT int init_default_types(amal_compiler *compiler) {
- return_if_error(create_default_type(compiler, "i8", (LhsExpr**)&compiler->default_types.i8));
- return_if_error(create_default_type(compiler, "i16", (LhsExpr**)&compiler->default_types.i16));
- return_if_error(create_default_type(compiler, "i32", (LhsExpr**)&compiler->default_types.i32));
- return_if_error(create_default_type(compiler, "i64", (LhsExpr**)&compiler->default_types.i64));
- return_if_error(create_default_type(compiler, "u8", (LhsExpr**)&compiler->default_types.u8));
- return_if_error(create_default_type(compiler, "u16", (LhsExpr**)&compiler->default_types.u16));
- return_if_error(create_default_type(compiler, "u32", (LhsExpr**)&compiler->default_types.u32));
- return_if_error(create_default_type(compiler, "u64", (LhsExpr**)&compiler->default_types.u64));
- return_if_error(create_default_type(compiler, "isize", (LhsExpr**)&compiler->default_types.isize));
- return_if_error(create_default_type(compiler, "usize", (LhsExpr**)&compiler->default_types.usize));
- return_if_error(create_default_type(compiler, "f32", (LhsExpr**)&compiler->default_types.f32));
- return_if_error(create_default_type(compiler, "f64", (LhsExpr**)&compiler->default_types.f64));
- return_if_error(create_default_type(compiler, "str", (LhsExpr**)&compiler->default_types.str));
+ return_if_error(create_default_type(compiler, "i8", &compiler->default_types.i8));
+ return_if_error(create_default_type(compiler, "i16", &compiler->default_types.i16));
+ return_if_error(create_default_type(compiler, "i32", &compiler->default_types.i32));
+ return_if_error(create_default_type(compiler, "i64", &compiler->default_types.i64));
+ return_if_error(create_default_type(compiler, "u8", &compiler->default_types.u8));
+ return_if_error(create_default_type(compiler, "u16", &compiler->default_types.u16));
+ return_if_error(create_default_type(compiler, "u32", &compiler->default_types.u32));
+ return_if_error(create_default_type(compiler, "u64", &compiler->default_types.u64));
+ return_if_error(create_default_type(compiler, "isize", &compiler->default_types.isize));
+ return_if_error(create_default_type(compiler, "usize", &compiler->default_types.usize));
+ return_if_error(create_default_type(compiler, "f32", &compiler->default_types.f32));
+ return_if_error(create_default_type(compiler, "f64", &compiler->default_types.f64));
+ return_if_error(create_default_type(compiler, "str", &compiler->default_types.str));
compiler->default_types.arithmetic_types[0] = compiler->default_types.i8;
compiler->default_types.arithmetic_types[1] = compiler->default_types.u8;
@@ -86,11 +89,10 @@ static CHECK_RESULT int init_default_types(amal_compiler *compiler) {
bool is_arithmetic_type(LhsExpr *expr, amal_compiler *compiler) {
usize i;
- const amal_default_types *default_types;
- default_types = &compiler->default_types;
+ const amal_default_types *default_types = &compiler->default_types;
for(i = 0; i < NUM_ARITHMETIC_TYPES; ++i) {
- if(expr == (LhsExpr*)default_types->arithmetic_types[i])
+ if(expr == &default_types->arithmetic_types[i]->lhs_expr)
return bool_true;
}
return bool_false;
@@ -103,19 +105,6 @@ void amal_compiler_options_init(amal_compiler_options *self) {
}
static CHECK_RESULT int amal_compiler_init(amal_compiler *self, const amal_compiler_options *options, amal_program *program) {
- int i;
-
- self->usable_thread_count = options ? options->num_threads : 0;
- if(self->usable_thread_count == 0) {
- self->usable_thread_count = amal_get_usable_thread_count();
- if(self->usable_thread_count == 0) {
- amal_log_warning("Unable to get the number of threads available on the system, using 1 thread.");
- amal_log_warning("You can override the number of threads used by setting compiler option for number of thread to use.");
- self->usable_thread_count = 1;
- }
- }
- amal_log_info("Using %d threads", self->usable_thread_count);
-
am_memset(&self->allocator, 0, sizeof(self->allocator));
am_memset(&self->root_scope, 0, sizeof(self->root_scope));
if(options)
@@ -124,20 +113,13 @@ static CHECK_RESULT int amal_compiler_init(amal_compiler *self, const amal_compi
amal_compiler_options_init(&self->options);
self->program = program;
self->started = bool_false;
- self->work_failed = bool_false;
- self->generic_work_object_index = 0;
- amal_mutex_init(&self->mutex);
+ return_if_error(thread_pool_init(&self->stage_task_thread_pool, options ? options->num_threads : 0));
+ return_if_error(amal_mutex_init(&self->mutex));
return_if_error(arena_allocator_init(&self->allocator));
cleanup_if_error(scope_init(&self->root_scope, NULL, &self->allocator));
cleanup_if_error(buffer_init(&self->parsers, &self->allocator));
- cleanup_if_error(buffer_init(&self->queued_files, &self->allocator));
cleanup_if_error(hash_map_init(&self->file_scopes, &self->allocator, sizeof(FileScopeReference*), hash_map_compare_string, amal_hash_string));
- cleanup_if_error(arena_allocator_alloc(&self->allocator,
- self->usable_thread_count * sizeof(ParserThreadData),
- (void**)&self->threads));
- for(i = 0; i < self->usable_thread_count; ++i)
- cleanup_if_error(parser_thread_data_init(&self->threads[i]));
cleanup_if_error(init_default_types(self));
return AMAL_COMPILER_OK;
@@ -147,11 +129,13 @@ static CHECK_RESULT int amal_compiler_init(amal_compiler *self, const amal_compi
}
void amal_compiler_deinit(amal_compiler *self) {
- int i;
- for(i = 0; i < self->usable_thread_count; ++i) {
- parser_thread_data_deinit(&self->threads[i]);
+ Parser **parser = buffer_begin(&self->parsers);
+ Parser **parser_end = buffer_end(&self->parsers);
+ for(; parser != parser_end; ++parser) {
+ if((*parser)->allocator)
+ arena_allocator_deinit((*parser)->allocator);
}
-
+ thread_pool_deinit(&self->stage_task_thread_pool);
amal_mutex_deinit(&self->mutex);
arena_allocator_deinit(&self->allocator);
}
@@ -165,13 +149,11 @@ typedef enum {
typedef struct {
amal_compiler *compiler;
- ParserThreadData *parser_thread_data;
FileScopeReference *file_scope;
} CompilerParserThreadUserData;
typedef struct {
amal_compiler *compiler;
- ParserThreadData *parser_thread_data;
Parser *parser;
ThreadWorkType work_type;
} CompilerGenericThreadUserData;
@@ -184,19 +166,25 @@ typedef struct {
ThreadWorkType type;
} ThreadWorkData;
-static CHECK_RESULT int amal_compiler_load_in_this_thread(amal_compiler *compiler, FileScopeReference *file_scope, ArenaAllocator *allocator) {
+static CHECK_RESULT int amal_compiler_load_in_this_thread(amal_compiler *compiler, FileScopeReference *file_scope) {
Parser *parser;
int result;
BufferView filepath;
+ ArenaAllocator *parser_allocator;
result = AMAL_COMPILER_ERR;
+ cleanup_if_error(amal_mutex_lock(&compiler->mutex, "amal_compiler_load_in_this_thread, create arena allocator"));
+ return_if_error(arena_allocator_alloc(&compiler->allocator, sizeof(ArenaAllocator), (void**)&parser_allocator));
+ amal_mutex_tryunlock(&compiler->mutex);
+ return_if_error(arena_allocator_init(parser_allocator));
+
filepath = create_buffer_view(file_scope->canonical_path.data, file_scope->canonical_path.size);
amal_log_info("Started parsing %.*s", filepath.size, filepath.data);
- return_if_error(arena_allocator_alloc(allocator, sizeof(Parser), (void**)&parser));
- return_if_error(parser_init(parser, compiler, allocator));
+ return_if_error(arena_allocator_alloc(parser_allocator, sizeof(Parser), (void**)&parser));
+ return_if_error(parser_init(parser, compiler, parser_allocator));
file_scope->parser = parser;
return_if_error(parser_parse_file(parser, filepath));
- cleanup_if_error(amal_mutex_lock(&compiler->mutex, "amal_compiler_load_in_this_thread"));
+ cleanup_if_error(amal_mutex_lock(&compiler->mutex, "amal_compiler_load_in_this_thread, add parser"));
cleanup_if_error(buffer_append(&compiler->parsers, &parser, sizeof(parser)));
amal_log_info("Finished parsing %.*s", filepath.size, filepath.data);
result = AMAL_COMPILER_OK;
@@ -206,51 +194,12 @@ static CHECK_RESULT int amal_compiler_load_in_this_thread(amal_compiler *compile
return result;
}
-static void* thread_callback_parse_file(void *userdata) {
- FileScopeReference *file_scope;
- CompilerParserThreadUserData compiler_parser_userdata;
- void *result;
- assert(!amal_thread_is_main());
-
- am_memcpy(&compiler_parser_userdata, userdata, sizeof(compiler_parser_userdata));
+static int thread_callback_parse_file(void *userdata) {
+ int result;
+ CompilerParserThreadUserData *compiler_parser_userdata = userdata;
+ result = amal_compiler_load_in_this_thread(compiler_parser_userdata->compiler,
+ compiler_parser_userdata->file_scope);
am_free(userdata);
- file_scope = compiler_parser_userdata.file_scope;
- result = (void*)AMAL_COMPILER_ERR;
- for(;;) {
- int has_next;
- /* Abort job if another job failed */
- if(compiler_parser_userdata.compiler->work_failed) {
- result = (void*)AMAL_COMPILER_WORK_FAIL_ABORT;
- goto cleanup;
- }
- cleanup_if_error(amal_compiler_load_in_this_thread(compiler_parser_userdata.compiler,
- file_scope,
- &compiler_parser_userdata.parser_thread_data->allocator));
- cleanup_if_error(amal_mutex_lock(&compiler_parser_userdata.compiler->mutex, "thread_callback_parse_file"));
- has_next = buffer_pop(&compiler_parser_userdata.compiler->queued_files, &file_scope, sizeof(FileScopeReference*));
- amal_mutex_tryunlock(&compiler_parser_userdata.compiler->mutex);
- if(has_next != 0)
- break;
- }
- result = NULL;
-
- cleanup:
- /*
- To stop all other parsers from working cleanly, we simply clear the file queue,
- and the other threads will stop when they are done with the file they are currently parsing.
- */
- if(result != NULL) {
- ignore_result_int(amal_mutex_lock(&compiler_parser_userdata.compiler->mutex, "thread_callback_parse_file"));
- buffer_clear(&compiler_parser_userdata.compiler->queued_files);
- }
- /*
- There can be a data race between writing to this and when this is read in @amal_compiler_load_file_join_threads.
- This is intentional and it's ok, because the join_threads function checks against this status is guaranteed at this point
- to not be a certain value that it checks for. Writing to an int is atomic.
- TODO: Verify this is ok on all platforms.
- */
- compiler_parser_userdata.parser_thread_data->status = PARSER_THREAD_STATUS_IDLE;
- amal_mutex_tryunlock(&compiler_parser_userdata.compiler->mutex);
return result;
}
@@ -302,195 +251,72 @@ static CHECK_RESULT int thread_generate_bytecode(Parser *parser) {
return result;
}
-/* TODO: Handle errors (stop work in all other threads and report errors/warnings) */
-static void* thread_callback_generic(void *userdata) {
- CompilerGenericThreadUserData compiler_userdata;
- Parser *parser;
- void *result;
- assert(!amal_thread_is_main());
-
- am_memcpy(&compiler_userdata, userdata, sizeof(compiler_userdata));
- am_free(userdata);
- parser = compiler_userdata.parser;
- result = (void*)AMAL_COMPILER_ERR;
- for(;;) {
- /* TODO: stop work in all other threads on failure */
- switch(compiler_userdata.work_type) {
- case THREAD_WORK_PARSE: {
- assert(bool_false && "Thread work type can't ge 'parse' for generic work");
- break;
- }
- case THREAD_WORK_RESOLVE_AST:
- cleanup_if_error(thread_resolve_ast(compiler_userdata.compiler, parser));
- break;
- case THREAD_WORK_GENERATE_SSA:
- cleanup_if_error(thread_generate_ssa(parser));
- break;
- case THREAD_WORK_GENERATE_BYTECODE:
- cleanup_if_error(thread_generate_bytecode(parser));
- break;
- }
-
- /* Abort job if another job failed */
- if(compiler_userdata.compiler->work_failed) {
- result = (void*)AMAL_COMPILER_WORK_FAIL_ABORT;
- goto cleanup;
+static int thread_callback_generic(void *userdata) {
+ int result;
+ CompilerGenericThreadUserData *compiler_userdata = userdata;
+ switch(compiler_userdata->work_type) {
+ case THREAD_WORK_PARSE: {
+ assert(bool_false && "Thread work type can't be 'parse' for generic work");
+ break;
}
-
- /* Find next job */
- cleanup_if_error(amal_mutex_lock(&compiler_userdata.compiler->mutex, "thread_callback_generic"));
- if(compiler_userdata.compiler->generic_work_object_index + 1 >= (int)buffer_get_size(&compiler_userdata.compiler->parsers, Parser*))
+ case THREAD_WORK_RESOLVE_AST:
+ result = thread_resolve_ast(compiler_userdata->compiler, compiler_userdata->parser);
+ break;
+ case THREAD_WORK_GENERATE_SSA:
+ result = thread_generate_ssa(compiler_userdata->parser);
+ break;
+ case THREAD_WORK_GENERATE_BYTECODE:
+ result = thread_generate_bytecode(compiler_userdata->parser);
break;
- ++compiler_userdata.compiler->generic_work_object_index;
- parser = *(Parser**)buffer_get(&compiler_userdata.compiler->parsers, compiler_userdata.compiler->generic_work_object_index, sizeof(parser));
- amal_mutex_tryunlock(&compiler_userdata.compiler->mutex);
- }
- result = NULL;
-
- cleanup:
- /*
- To stop all other worker threads cleanly, we simply say we are done with all work in the queue,
- and the other threads will stop when they are done with the work they are currently working on.
- */
- if(result != NULL) {
- ignore_result_int(amal_mutex_lock(&compiler_userdata.compiler->mutex, "thread_callback_generic"));
- compiler_userdata.compiler->generic_work_object_index = (int)buffer_get_size(&compiler_userdata.compiler->parsers, Parser*);
}
- compiler_userdata.parser_thread_data->status = PARSER_THREAD_STATUS_IDLE;
- amal_mutex_tryunlock(&compiler_userdata.compiler->mutex);
+ am_free(userdata);
return result;
}
-static CHECK_RESULT int amal_compiler_select_thread_for_work(amal_compiler *self, ThreadWorkData work_data, ParserThreadData **thread_selected) {
- int i;
- int result;
- ParserThreadData *parser_thread_data;
- void *thread_user_data;
- thread_user_data = NULL;
- *thread_selected = NULL;
- result = AMAL_COMPILER_OK;
-
- cleanup_if_error(amal_mutex_lock(&self->mutex, "amal_compiler_select_thread_for_work"));
- for(i = 0; i < self->usable_thread_count; ++i) {
- parser_thread_data = &self->threads[i];
- if(parser_thread_data->status == PARSER_THREAD_STATUS_RUNNING)
- continue;
-
- switch(work_data.type) {
- case THREAD_WORK_PARSE: {
- CompilerParserThreadUserData *userdata;
- cleanup_if_error(am_malloc(sizeof(CompilerParserThreadUserData), (void**)&userdata));
- thread_user_data = userdata;
- userdata->compiler = self;
- userdata->parser_thread_data = parser_thread_data;
- userdata->file_scope = work_data.value.file_scope;
- result = parser_thread_data_start(parser_thread_data, thread_callback_parse_file, userdata);
- break;
- }
- case THREAD_WORK_RESOLVE_AST:
- case THREAD_WORK_GENERATE_SSA:
- case THREAD_WORK_GENERATE_BYTECODE: {
- CompilerGenericThreadUserData *userdata;
- cleanup_if_error(am_malloc(sizeof(CompilerGenericThreadUserData), (void**)&userdata));
- thread_user_data = userdata;
- userdata->compiler = self;
- userdata->parser_thread_data = parser_thread_data;
- userdata->parser = work_data.value.parser;
- userdata->work_type = work_data.type;
- ++self->generic_work_object_index;
- result = parser_thread_data_start(parser_thread_data, thread_callback_generic, userdata);
- break;
- }
+static CHECK_RESULT int amal_compiler_add_task(amal_compiler *self, ThreadWorkData work_data) {
+ void *thread_user_data = NULL;
+ int result = AMAL_COMPILER_OK;
+
+ switch(work_data.type) {
+ case THREAD_WORK_PARSE: {
+ CompilerParserThreadUserData *userdata;
+ cleanup_if_error(am_malloc(sizeof(CompilerParserThreadUserData), (void**)&userdata));
+ thread_user_data = userdata;
+ userdata->compiler = self;
+ userdata->file_scope = work_data.value.file_scope;
+ result = thread_pool_add_task(&self->stage_task_thread_pool, thread_callback_parse_file, userdata);
+ break;
+ }
+ case THREAD_WORK_RESOLVE_AST:
+ case THREAD_WORK_GENERATE_SSA:
+ case THREAD_WORK_GENERATE_BYTECODE: {
+ CompilerGenericThreadUserData *userdata;
+ cleanup_if_error(am_malloc(sizeof(CompilerGenericThreadUserData), (void**)&userdata));
+ thread_user_data = userdata;
+ userdata->compiler = self;
+ userdata->parser = work_data.value.parser;
+ userdata->work_type = work_data.type;
+ result = thread_pool_add_task(&self->stage_task_thread_pool, thread_callback_generic, userdata);
+ break;
}
- *thread_selected = parser_thread_data;
- break;
}
cleanup:
if(result != 0)
am_free(thread_user_data);
- amal_mutex_tryunlock(&self->mutex);
- return result;
-}
-
-static CHECK_RESULT bool amal_compiler_check_all_threads_done(amal_compiler *self) {
- int i;
- bool result;
- result = bool_false;
-
- cleanup_if_error(amal_mutex_lock(&self->mutex, "amal_compiler_check_all_threads_done"));
- for(i = 0; i < self->usable_thread_count; ++i) {
- ParserThreadData *parser_thread_data;
- parser_thread_data = &self->threads[i];
- if(parser_thread_data->status == PARSER_THREAD_STATUS_RUNNING) {
- goto cleanup;
- }
- }
-
- result = bool_true;
- cleanup:
- amal_mutex_tryunlock(&self->mutex);
- return result;
-}
-
-static CHECK_RESULT int amal_compiler_load_file_join_threads(amal_compiler *self) {
- int i;
- int result;
- void *thread_return_data;
- ParserThreadData *parser_thread_data;
-
- assert(amal_thread_is_main());
- thread_return_data = NULL;
- for(;;) {
- bool done;
- /*
- Joining running threads. After checking one running thread another one might start up,
- so this is mostly to wait for threads to finish and to sleep without doing work.
- The check after that (amal_compiler_all_threads_done) check that all threads are done correctly
- */
- for(i = 0; i < self->usable_thread_count; ++i) {
- result = amal_mutex_lock(&self->mutex, "amal_compiler_load_file_join_threads, waiting for workers");
- parser_thread_data = &self->threads[i];
- amal_mutex_tryunlock(&self->mutex);
- if(result != 0)
- goto cleanup;
- /* TODO: What to do if join fails? */
- ignore_result_int(parser_thread_data_join(parser_thread_data, &thread_return_data));
- if(thread_return_data != NULL) {
- amal_log_error("Failed, waiting for jobs to finish");
- self->work_failed = bool_true;
- }
- }
-
- done = amal_compiler_check_all_threads_done(self);
- if(done)
- break;
- }
-
- result = AMAL_COMPILER_OK;
- cleanup:
- if(self->work_failed)
- result = AMAL_COMPILER_ERR;
return result;
}
static CHECK_RESULT int amal_compiler_dispatch_generic(amal_compiler *self, ThreadWorkType work_type) {
- Parser **parser;
- Parser **parser_end;
- parser = buffer_begin(&self->parsers);
- parser_end = buffer_end(&self->parsers);
- self->generic_work_object_index = 0;
+ Parser **parser = buffer_begin(&self->parsers);
+ Parser **parser_end = buffer_end(&self->parsers);
for(; parser != parser_end; ++parser) {
- ParserThreadData *thread_selected;
ThreadWorkData thread_work_data;
thread_work_data.type = work_type;
thread_work_data.value.parser = *parser;
- return_if_error(amal_compiler_select_thread_for_work(self, thread_work_data, &thread_selected));
- /* After all threads have been used, they will handle using the remaining parsers or stop if there is an error */
- if(!thread_selected)
- break;
+ return_if_error(amal_compiler_add_task(self, thread_work_data));
}
- return amal_compiler_load_file_join_threads(self);
+ return thread_pool_join_all_tasks(&self->stage_task_thread_pool) ? 0 : -1;
}
static CHECK_RESULT int amal_compiler_generate_program(amal_compiler *self) {
@@ -498,10 +324,8 @@ static CHECK_RESULT int amal_compiler_generate_program(amal_compiler *self) {
TODO: Copying the bytecode to the program can be done using multiple threads.
Use self->threads for that.
*/
- Parser **parser;
- Parser **parser_end;
- parser = buffer_begin(&self->parsers);
- parser_end = buffer_end(&self->parsers);
+ Parser **parser = buffer_begin(&self->parsers);
+ Parser **parser_end = buffer_end(&self->parsers);
for(; parser != parser_end; ++parser) {
return_if_error(amal_program_append_bytecode(self->program, &(*parser)->bytecode));
}
@@ -580,15 +404,10 @@ static CHECK_RESULT int validate_main_func(FileScopeReference *main_file_scope,
}
int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath, FileScopeReference **file_scope) {
- int result;
- ParserThreadData *parser_thread_data;
ThreadWorkData thread_work_data;
bool main_job;
bool new_entry;
- if(self->work_failed)
- return AMAL_COMPILER_WORK_FAIL_ABORT;
-
return_if_error(try_create_file_scope(self, filepath, file_scope, &new_entry));
assert(file_scope && *file_scope && (*file_scope)->canonical_path.data);
if(!new_entry) {
@@ -596,7 +415,6 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath,
return 0;
}
- result = AMAL_COMPILER_ERR;
thread_work_data.type = THREAD_WORK_PARSE;
thread_work_data.value.file_scope = *file_scope;
main_job = bool_false;
@@ -607,7 +425,7 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath,
main_job = bool_true;
}
- return_if_error(amal_compiler_select_thread_for_work(self, thread_work_data, &parser_thread_data));
+ return_if_error(amal_compiler_add_task(self, thread_work_data));
if(main_job) {
/*doc(Compiler flow)
@@ -619,8 +437,8 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath,
*/
LhsExpr *main_func;
- return_if_error(amal_compiler_load_file_join_threads(self));
- assert(amal_compiler_check_all_threads_done(self));
+ if(!thread_pool_join_all_tasks(&self->stage_task_thread_pool))
+ return -1;
amal_log_info("Finished parsing all files, resolving AST");
return_if_error(validate_main_func(*file_scope, &main_func));
@@ -631,15 +449,12 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath,
main_func->decl_flags |= DECL_FLAG_EXPORT;
return_if_error(amal_compiler_dispatch_generic(self, THREAD_WORK_RESOLVE_AST));
- assert(amal_compiler_check_all_threads_done(self));
amal_log_info("Finished resolving AST, generating SSA");
return_if_error(amal_compiler_dispatch_generic(self, THREAD_WORK_GENERATE_SSA));
- assert(amal_compiler_check_all_threads_done(self));
amal_log_info("Finished generating SSA");
return_if_error(amal_compiler_dispatch_generic(self, THREAD_WORK_GENERATE_BYTECODE));
- assert(amal_compiler_check_all_threads_done(self));
amal_log_info("Finished generating bytecode");
return_if_error(amal_compiler_generate_program(self));
@@ -648,14 +463,5 @@ int amal_compiler_internal_load_file(amal_compiler *self, const char *filepath,
return AMAL_COMPILER_OK;
}
- if(parser_thread_data)
- return AMAL_COMPILER_OK;
-
- cleanup_if_error(amal_mutex_lock(&self->mutex, "amal_compiler_load_file"));
- cleanup_if_error(buffer_append(&self->queued_files, file_scope, sizeof(FileScopeReference*)));
- result = AMAL_COMPILER_OK;
-
- cleanup:
- amal_mutex_tryunlock(&self->mutex);
- return result;
+ return 0;
}
diff --git a/src/parser.c b/src/parser.c
index 03b835e..85165d7 100644
--- a/src/parser.c
+++ b/src/parser.c
@@ -30,31 +30,6 @@ static void parser_parse_var_type(Parser *self, VariableType *result);
static void parser_parse_var_type_def(Parser *self, VariableType *result);
static void parser_queue_file(Parser *self, BufferView path, FileScopeReference **file_scope);
-int parser_thread_data_init(ParserThreadData *self) {
- am_memset(&self->allocator, 0, sizeof(self->allocator));
- am_memset(&self->thread, 0, sizeof(self->thread));
- self->status = PARSER_THREAD_STATUS_NEW;
- return arena_allocator_init(&self->allocator);
-}
-
-void parser_thread_data_deinit(ParserThreadData *self) {
- ignore_result_int(amal_thread_deinit(&self->thread));
- arena_allocator_deinit(&self->allocator);
-}
-
-int parser_thread_data_start(ParserThreadData *self, AmalThreadCallbackFunc callback_func, void *userdata) {
- return_if_error(amal_thread_deinit(&self->thread));
- return_if_error(amal_thread_create(&self->thread, AMAL_THREAD_JOINABLE, "Parser", callback_func, userdata));
- self->status = PARSER_THREAD_STATUS_RUNNING;
- return 0;
-}
-
-int parser_thread_data_join(ParserThreadData *self, void **result) {
- if(self->status == PARSER_THREAD_STATUS_NEW)
- return 0;
- return amal_thread_join(&self->thread, result);
-}
-
int parser_init(Parser *self, amal_compiler *compiler, ArenaAllocator *allocator) {
self->allocator = allocator;
self->compiler = compiler;
diff --git a/src/program.c b/src/program.c
index 17aee03..082f9fd 100644
--- a/src/program.c
+++ b/src/program.c
@@ -216,6 +216,8 @@ static CHECK_RESULT int amal_program_read_strings(amal_program *self) {
if(bytes_left_to_read(self) < strings_size)
return AMAL_PROGRAM_INVALID_STRINGS_SIZE;
+ am_free(self->string_indices);
+ self->string_indices = NULL;
if(am_malloc(sizeof(u32) * self->num_strings, (void**)&self->string_indices) != 0)
return AMAL_PROGRAM_ALLOC_FAILURE;
string_index_ptr = self->string_indices;
@@ -266,6 +268,8 @@ static CHECK_RESULT int amal_program_read_external_functions(amal_program *self)
if(bytes_left_to_read(self) < extern_funcs_size)
return AMAL_PROGRAM_INVALID_EXTERNAL_FUNCTIONS_SIZE;
+ am_free(self->extern_func_indices);
+ self->extern_func_indices = NULL;
if(am_malloc(sizeof(u32) * self->num_extern_functions, (void**)&self->extern_func_indices) != 0)
return AMAL_PROGRAM_ALLOC_FAILURE;
extern_func_index_ptr = self->extern_func_indices;
diff --git a/src/ssa/ssa.c b/src/ssa/ssa.c
index 39b5a80..500555a 100644
--- a/src/ssa/ssa.c
+++ b/src/ssa/ssa.c
@@ -551,10 +551,8 @@ static CHECK_RESULT SsaRegister funccall_generate_ssa(FunctionCall *self, AstRes
LhsExpr *func_lhs_expr;
{
- Ast **arg;
- Ast **arg_end;
- arg = buffer_begin(&self->args);
- arg_end = buffer_end(&self->args);
+ Ast **arg = buffer_begin(&self->args);
+ Ast **arg_end = buffer_end(&self->args);
for(; arg != arg_end; ++arg) {
SsaRegister arg_reg;
arg_reg = ast_generate_ssa(*arg, context);
@@ -781,10 +779,8 @@ CHECK_RESULT SsaRegister scope_named_object_generate_ssa(ScopeNamedObject *self,
}
void scope_generate_ssa(Scope *self, SsaCompilerContext *context) {
- Ast **ast;
- Ast **ast_end;
- ast = buffer_begin(&self->ast_objects);
- ast_end = buffer_end(&self->ast_objects);
+ Ast **ast = buffer_begin(&self->ast_objects);
+ Ast **ast_end = buffer_end(&self->ast_objects);
for(; ast != ast_end; ++ast) {
ignore_result_int(ast_generate_ssa(*ast, context));
}
diff --git a/src/std/alloc.c b/src/std/alloc.c
index d07e94f..44a5105 100644
--- a/src/std/alloc.c
+++ b/src/std/alloc.c
@@ -15,7 +15,7 @@ int am_malloc(usize size, void **mem) {
int am_realloc(void *mem, usize new_size, void **new_mem) {
void *new_allocated_data = realloc(mem, new_size);
- if(!new_allocated_data) {
+ if(!new_allocated_data && new_size != 0) {
amal_log_error("am_malloc: failed to reallocate memory to size %lu", new_size);
return ALLOC_FAIL;
}
diff --git a/src/std/arena_allocator.c b/src/std/arena_allocator.c
index 4934925..11fb40d 100644
--- a/src/std/arena_allocator.c
+++ b/src/std/arena_allocator.c
@@ -32,10 +32,8 @@ int arena_allocator_init(ArenaAllocator *self) {
}
static void arena_allocator_deinit_buffers(ArenaAllocator *self) {
- void **mem;
- void **mems_end;
- mem = buffer_begin(&self->mems);
- mems_end = buffer_end(&self->mems);
+ void **mem = buffer_begin(&self->mems);
+ void **mems_end = buffer_end(&self->mems);
while(mem != mems_end) {
am_free(*mem);
++mem;
diff --git a/src/std/log.c b/src/std/log.c
index 88ff0cf..59e0319 100644
--- a/src/std/log.c
+++ b/src/std/log.c
@@ -12,7 +12,7 @@ static bool mutex_initialized = bool_false;
/* Safe to call multiple times */
static void mutex_init() {
if(!mutex_initialized) {
- amal_mutex_init(&mutex);
+ ignore_result_int(amal_mutex_init(&mutex));
mutex_initialized = bool_true;
}
}
diff --git a/src/std/thread.c b/src/std/thread.c
index fb0e9ab..87362d2 100644
--- a/src/std/thread.c
+++ b/src/std/thread.c
@@ -107,13 +107,13 @@ int amal_thread_join(amal_thread *self, void **result) {
return 0;
}
-void amal_mutex_init(amal_mutex *self) {
- pthread_mutex_init(&self->mutex, NULL);
+int amal_mutex_init(amal_mutex *self) {
#ifdef AMAL_MUTEX_DEBUG
self->lock_identifier = NULL;
#endif
self->locked = bool_false;
self->owner_thread = 0;
+ return pthread_mutex_init(&self->mutex, NULL);
}
void amal_mutex_deinit(amal_mutex *self) {
diff --git a/src/std/thread_pool.c b/src/std/thread_pool.c
new file mode 100644
index 0000000..8f6e180
--- /dev/null
+++ b/src/std/thread_pool.c
@@ -0,0 +1,187 @@
+#include "../../include/std/thread_pool.h"
+#include "../../include/std/alloc.h"
+#include "../../include/std/mem.h"
+#include "../../include/std/log.h"
+
+/* Sets @result to NULL if there are no available tasks */
+static CHECK_RESULT int thread_pool_take_task(amal_thread_pool *self, amal_thread_pool_task **result) {
+ *result = NULL;
+ cleanup_if_error(amal_mutex_lock(&self->task_select_mutex, "thread_pool_take_task"));
+ if(self->num_finished_queued_tasks < (int)buffer_get_size(&self->queued_tasks, amal_thread_pool_task) && !self->dead)
+ *result = buffer_get(&self->queued_tasks, self->num_finished_queued_tasks, sizeof(amal_thread_pool_task));
+ cleanup:
+ amal_mutex_tryunlock(&self->task_select_mutex);
+ return 0;
+}
+
+static void* thread_pool_thread_callback(void *userdata) {
+ amal_thread_pool_callback_data *thread_pool_data = userdata;
+ if(thread_pool_data->thread_pool->dead)
+ goto cleanup;
+
+ if(thread_pool_data->callback(thread_pool_data->userdata) != 0) {
+ thread_pool_mark_dead(thread_pool_data->thread_pool);
+ goto cleanup;
+ }
+
+ for(;;) {
+ amal_thread_pool_task *new_task;
+ cleanup_if_error(thread_pool_take_task(thread_pool_data->thread_pool, &new_task));
+ if(!new_task)
+ break;
+
+ if(new_task->callback(new_task->userdata) != 0) {
+ thread_pool_mark_dead(thread_pool_data->thread_pool);
+ goto cleanup;
+ }
+ }
+
+ cleanup:
+ thread_pool_data->thread_pool_thread->status = THREAD_POOL_THREAD_STATUS_IDLE;
+ am_free(thread_pool_data);
+ return NULL;
+}
+
+static void thread_pool_thread_init(amal_thread_pool_thread *self) {
+ am_memset(&self->thread, 0, sizeof(self->thread));
+ self->status = THREAD_POOL_THREAD_STATUS_NEW;
+}
+
+static void thread_pool_thread_deinit(amal_thread_pool_thread *self) {
+ ignore_result_int(amal_thread_deinit(&self->thread));
+}
+
+static CHECK_RESULT int thread_pool_thread_start(amal_thread_pool_thread *self, amal_thread_pool *thread_pool, amal_thread_job_callback callback, void *userdata) {
+ amal_thread_pool_callback_data *callback_data;
+ return_if_error(am_malloc(sizeof(amal_thread_pool_callback_data), (void**)&callback_data));
+ callback_data->thread_pool = thread_pool;
+ callback_data->thread_pool_thread = self;
+ callback_data->callback = callback;
+ callback_data->userdata = userdata;
+
+ return_if_error(amal_thread_deinit(&self->thread));
+ return_if_error(amal_thread_create(&self->thread, AMAL_THREAD_JOINABLE, "thread_pool_thread_start", thread_pool_thread_callback, callback_data));
+ self->status = THREAD_POOL_THREAD_STATUS_RUNNING;
+ return 0;
+}
+
+static CHECK_RESULT int thread_pool_thread_join(amal_thread_pool_thread *self) {
+ if(self->status == THREAD_POOL_THREAD_STATUS_NEW)
+ return 0;
+ return amal_thread_join(&self->thread, NULL);
+}
+
+int thread_pool_init(amal_thread_pool *self, int num_threads) {
+ int i;
+ self->num_threads = num_threads != 0 ? num_threads : amal_get_usable_thread_count();
+ if(self->num_threads == 0) {
+ amal_log_warning("Unable to get the number of threads available on the system, using 1 thread.");
+ self->num_threads = 1;
+ }
+
+ self->dead = bool_false;
+ self->num_finished_queued_tasks = 0;
+ self->threads = NULL;
+
+ ignore_result_int(buffer_init(&self->queued_tasks, NULL));
+ return_if_error(amal_mutex_init(&self->task_select_mutex));
+ cleanup_if_error(am_malloc(self->num_threads * sizeof(amal_thread_pool_thread), (void**)&self->threads));
+ for(i = 0; i < self->num_threads; ++i)
+ thread_pool_thread_init(&self->threads[i]);
+ return 0;
+
+ cleanup:
+ am_free(self->threads);
+ self->num_threads = 0;
+ return -1;
+}
+
+void thread_pool_deinit(amal_thread_pool *self) {
+ if(self->threads) {
+ int i;
+ for(i = 0; i < self->num_threads; ++i)
+ thread_pool_thread_deinit(&self->threads[i]);
+ am_free(self->threads);
+ }
+ amal_mutex_deinit(&self->task_select_mutex);
+ buffer_deinit(&self->queued_tasks);
+}
+
+int thread_pool_add_task(amal_thread_pool *self, amal_thread_job_callback callback, void *userdata) {
+ int i;
+ bool found_available_thread = bool_false;
+ int result = -1;
+
+ if(self->dead)
+ return result;
+
+ cleanup_if_error(amal_mutex_lock(&self->task_select_mutex, "thread_pool_add_task"));
+ for(i = 0; i < self->num_threads; ++i) {
+ amal_thread_pool_thread *thread = &self->threads[i];
+ if(thread->status != THREAD_POOL_THREAD_STATUS_RUNNING) {
+ cleanup_if_error(thread_pool_thread_start(thread, self, callback, userdata));
+ found_available_thread = bool_true;
+ break;
+ }
+ }
+
+ if(!found_available_thread) {
+ amal_thread_pool_task task;
+ task.callback = callback;
+ task.userdata = userdata;
+ cleanup_if_error(buffer_append(&self->queued_tasks, &task, sizeof(task)));
+ }
+
+ result = 0;
+ cleanup:
+ amal_mutex_tryunlock(&self->task_select_mutex);
+ return result;
+}
+
+bool thread_pool_join_all_tasks(amal_thread_pool *self) {
+ bool died;
+ for(;;) {
+ /*
+ Joining running threads. After checking one running thread another one might start up,
+ so this is mostly to wait for threads to finish and to sleep without doing work.
+ The check after that (thread_pool_check_threads_finished) check that all threads have finished correctly
+ */
+ int i;
+ bool finished = bool_true;
+ for(i = 0; i < self->num_threads; ++i) {
+ amal_thread_pool_thread_status thread_status;
+ if(amal_mutex_lock(&self->task_select_mutex, "thread_pool_join_all_tasks") != 0)
+ thread_pool_mark_dead(self);
+ thread_status = self->threads[i].status;
+ amal_mutex_tryunlock(&self->task_select_mutex);
+ /* TODO: What to do if join fails? */
+ switch(thread_status) {
+ case THREAD_POOL_THREAD_STATUS_NEW:
+ break;
+ case THREAD_POOL_THREAD_STATUS_RUNNING:
+ finished = bool_false;
+ /* fallthrough */
+ case THREAD_POOL_THREAD_STATUS_IDLE:
+ ignore_result_int(thread_pool_thread_join(&self->threads[i]));
+ break;
+ }
+ }
+
+ if(finished)
+ break;
+ }
+
+ died = self->dead;
+ self->dead = bool_false;
+ buffer_clear(&self->queued_tasks);
+ self->num_finished_queued_tasks = 0;
+ return !died;
+}
+
+void thread_pool_mark_dead(amal_thread_pool *self) {
+ self->dead = bool_true;
+}
+
+BufferView thread_pool_get_threads(amal_thread_pool *self) {
+ return create_buffer_view((const char*)self->threads, self->num_threads * sizeof(amal_thread_pool_thread));
+}
diff --git a/tests/bytecode.amal b/tests/bytecode.amal
index 0aad6de..a8ea53b 100644
--- a/tests/bytecode.amal
+++ b/tests/bytecode.amal
@@ -1,3 +1,5 @@
+const b = @import("b.amal");
+
extern const print_extern: fn;
extern const print_extern_num: fn(num: i32);