aboutsummaryrefslogtreecommitdiff
path: root/python/olm/session.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2016-05-23 18:03:32 +0100
committerRichard van der Hoff <richard@matrix.org>2016-05-23 18:03:32 +0100
commitea130cae0db3d68a49ae2059b2568eb8504e84b6 (patch)
treecda9cb4f02373c2babcadb65b99564d7599512ba /python/olm/session.py
parentaacf1154684885d244182eb9cd68e429d72ee57a (diff)
parent7ccaae564aa0d62aecd50e0cd05cc11af58b2d25 (diff)
Merge branch 'rav/refactor_python_wrappers'
Diffstat (limited to 'python/olm/session.py')
-rw-r--r--python/olm/session.py192
1 files changed, 192 insertions, 0 deletions
diff --git a/python/olm/session.py b/python/olm/session.py
new file mode 100644
index 0000000..308f220
--- /dev/null
+++ b/python/olm/session.py
@@ -0,0 +1,192 @@
+from ._base import *
+
+
+lib.olm_session_size.argtypes = []
+lib.olm_session_size.restype = c_size_t
+
+lib.olm_session.argtypes = [c_void_p]
+lib.olm_session.restype = c_void_p
+
+lib.olm_session_last_error.argtypes = [c_void_p]
+lib.olm_session_last_error.restype = c_char_p
+
+
+def session_errcheck(res, func, args):
+ if res == ERR:
+ raise OlmError("%s: %s" % (
+ func.__name__, lib.olm_session_last_error(args[0])
+ ))
+ return res
+
+
+def session_function(func, *types):
+ func.argtypes = (c_void_p,) + types
+ func.restypes = c_size_t
+ func.errcheck = session_errcheck
+
+session_function(lib.olm_session_last_error)
+session_function(
+ lib.olm_pickle_session, c_void_p, c_size_t, c_void_p, c_size_t
+)
+session_function(
+ lib.olm_unpickle_session, c_void_p, c_size_t, c_void_p, c_size_t
+)
+session_function(lib.olm_create_outbound_session_random_length)
+session_function(
+ lib.olm_create_outbound_session,
+ c_void_p, # Account
+ c_void_p, c_size_t, # Identity Key
+ c_void_p, c_size_t, # One Time Key
+ c_void_p, c_size_t, # Random
+)
+session_function(
+ lib.olm_create_inbound_session,
+ c_void_p, # Account
+ c_void_p, c_size_t, # Pre Key Message
+)
+session_function(
+ lib.olm_create_inbound_session_from,
+ c_void_p, # Account
+ c_void_p, c_size_t, # Identity Key
+ c_void_p, c_size_t, # Pre Key Message
+)
+session_function(lib.olm_session_id_length)
+session_function(lib.olm_session_id, c_void_p, c_size_t)
+session_function(lib.olm_matches_inbound_session, c_void_p, c_size_t)
+session_function(
+ lib.olm_matches_inbound_session_from,
+ c_void_p, c_size_t, # Identity Key
+ c_void_p, c_size_t, # Pre Key Message
+)
+session_function(lib.olm_encrypt_message_type)
+session_function(lib.olm_encrypt_random_length)
+session_function(lib.olm_encrypt_message_length, c_size_t)
+session_function(
+ lib.olm_encrypt,
+ c_void_p, c_size_t, # Plaintext
+ c_void_p, c_size_t, # Random
+ c_void_p, c_size_t, # Message
+);
+session_function(
+ lib.olm_decrypt_max_plaintext_length,
+ c_size_t, # Message Type
+ c_void_p, c_size_t, # Message
+)
+session_function(
+ lib.olm_decrypt,
+ c_size_t, # Message Type
+ c_void_p, c_size_t, # Message
+ c_void_p, c_size_t, # Plaintext
+)
+
+class Session(object):
+ def __init__(self):
+ self.buf = create_string_buffer(lib.olm_session_size())
+ self.ptr = lib.olm_session(self.buf)
+
+ def pickle(self, key):
+ key_buffer = create_string_buffer(key)
+ pickle_length = lib.olm_pickle_session_length(self.ptr)
+ pickle_buffer = create_string_buffer(pickle_length)
+ lib.olm_pickle_session(
+ self.ptr, key_buffer, len(key), pickle_buffer, pickle_length
+ )
+ return pickle_buffer.raw
+
+ def unpickle(self, key, pickle):
+ key_buffer = create_string_buffer(key)
+ pickle_buffer = create_string_buffer(pickle)
+ lib.olm_unpickle_session(
+ self.ptr, key_buffer, len(key), pickle_buffer, len(pickle)
+ )
+
+ def create_outbound(self, account, identity_key, one_time_key):
+ r_length = lib.olm_create_outbound_session_random_length(self.ptr)
+ random = read_random(r_length)
+ random_buffer = create_string_buffer(random)
+ identity_key_buffer = create_string_buffer(identity_key)
+ one_time_key_buffer = create_string_buffer(one_time_key)
+ lib.olm_create_outbound_session(
+ self.ptr,
+ account.ptr,
+ identity_key_buffer, len(identity_key),
+ one_time_key_buffer, len(one_time_key),
+ random_buffer, r_length
+ )
+
+ def create_inbound(self, account, one_time_key_message):
+ one_time_key_message_buffer = create_string_buffer(one_time_key_message)
+ lib.olm_create_inbound_session(
+ self.ptr,
+ account.ptr,
+ one_time_key_message_buffer, len(one_time_key_message)
+ )
+
+ def create_inbound_from(self, account, identity_key, one_time_key_message):
+ identity_key_buffer = create_string_buffer(identity_key)
+ one_time_key_message_buffer = create_string_buffer(one_time_key_message)
+ lib.olm_create_inbound_session_from(
+ self.ptr,
+ account.ptr,
+ identity_key_buffer, len(identity_key),
+ one_time_key_message_buffer, len(one_time_key_message)
+ )
+
+ def session_id(self):
+ id_length = lib.olm_session_id_length(self.ptr)
+ id_buffer = create_string_buffer(id_length)
+ lib.olm_session_id(self.ptr, id_buffer, id_length);
+ return id_buffer.raw
+
+ def matches_inbound(self, one_time_key_message):
+ one_time_key_message_buffer = create_string_buffer(one_time_key_message)
+ return bool(lib.olm_matches_inbound_session(
+ self.ptr,
+ one_time_key_message_buffer, len(one_time_key_message)
+ ))
+
+ def matches_inbound_from(self, identity_key, one_time_key_message):
+ identity_key_buffer = create_string_buffer(identity_key)
+ one_time_key_message_buffer = create_string_buffer(one_time_key_message)
+ return bool(lib.olm_matches_inbound_session(
+ self.ptr,
+ identity_key_buffer, len(identity_key),
+ one_time_key_message_buffer, len(one_time_key_message)
+ ))
+
+ def encrypt(self, plaintext):
+ r_length = lib.olm_encrypt_random_length(self.ptr)
+ random = read_random(r_length)
+ random_buffer = create_string_buffer(random)
+
+ message_type = lib.olm_encrypt_message_type(self.ptr)
+ message_length = lib.olm_encrypt_message_length(
+ self.ptr, len(plaintext)
+ )
+ message_buffer = create_string_buffer(message_length)
+
+ plaintext_buffer = create_string_buffer(plaintext)
+
+ lib.olm_encrypt(
+ self.ptr,
+ plaintext_buffer, len(plaintext),
+ random_buffer, r_length,
+ message_buffer, message_length,
+ )
+ return message_type, message_buffer.raw
+
+ def decrypt(self, message_type, message):
+ message_buffer = create_string_buffer(message)
+ max_plaintext_length = lib.olm_decrypt_max_plaintext_length(
+ self.ptr, message_type, message_buffer, len(message)
+ )
+ plaintext_buffer = create_string_buffer(max_plaintext_length)
+ message_buffer = create_string_buffer(message)
+ plaintext_length = lib.olm_decrypt(
+ self.ptr, message_type, message_buffer, len(message),
+ plaintext_buffer, max_plaintext_length
+ )
+ return plaintext_buffer.raw[:plaintext_length]
+
+ def clear(self):
+ pass