aboutsummaryrefslogtreecommitdiff
path: root/python/olm
diff options
context:
space:
mode:
authorHubert Chathi <hubert@uhoreg.ca>2018-10-16 00:31:56 -0400
committerHubert Chathi <hubert@uhoreg.ca>2018-10-16 00:31:56 -0400
commit5ef6a844d6fd3d58d1eb85dcd188ac6b6baa3fbe (patch)
tree267b23b74f57cc1d017dea8b844e318201fb5db9 /python/olm
parent357d4ff4795d89d623663b3996ddd2dfd4990971 (diff)
overwrite buffers that may contain sensitive data
also reduce the amount of memory copying that we do
Diffstat (limited to 'python/olm')
-rw-r--r--python/olm/_compat.py10
-rw-r--r--python/olm/account.py62
-rw-r--r--python/olm/group_session.py165
-rw-r--r--python/olm/session.py106
-rw-r--r--python/olm/utility.py19
5 files changed, 234 insertions, 128 deletions
diff --git a/python/olm/_compat.py b/python/olm/_compat.py
index 8f1670d..91e4d1b 100644
--- a/python/olm/_compat.py
+++ b/python/olm/_compat.py
@@ -26,6 +26,16 @@ except ImportError: # pragma: no cover
URANDOM = urandom # type: ignore
+def to_bytearray(string):
+ # type: (AnyStr) -> bytes
+ if isinstance(string, bytes):
+ return bytearray(string)
+ elif isinstance(string, str):
+ return bytearray(string, "utf-8")
+
+ raise TypeError("Invalid type {}".format(type(string)))
+
+
def to_bytes(string):
# type: (AnyStr) -> bytes
if isinstance(string, bytes):
diff --git a/python/olm/account.py b/python/olm/account.py
index 1dba96d..8455655 100644
--- a/python/olm/account.py
+++ b/python/olm/account.py
@@ -37,7 +37,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
-from ._compat import URANDOM, to_bytes
+from ._compat import URANDOM, to_bytearray
from ._finalize import track_for_finalization
# This is imported only for type checking purposes
@@ -82,12 +82,12 @@ class Account(object):
random_length = lib.olm_create_account_random_length(self._account)
random = URANDOM(random_length)
- random_buffer = ffi.new("char[]", random)
self._check_error(
- lib.olm_create_account(self._account, random_buffer,
+ lib.olm_create_account(self._account, ffi.from_buffer(random),
random_length))
+
def _check_error(self, ret):
# type: (int) -> None
if ret != lib.olm_error():
@@ -111,15 +111,23 @@ class Account(object):
passphrase(str, optional): The passphrase to be used to encrypt
the account.
"""
- byte_key = bytes(passphrase, "utf-8") if passphrase else b""
- key_buffer = ffi.new("char[]", byte_key)
+ byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
pickle_length = lib.olm_pickle_account_length(self._account)
pickle_buffer = ffi.new("char[]", pickle_length)
- self._check_error(
- lib.olm_pickle_account(self._account, key_buffer, len(byte_key),
- pickle_buffer, pickle_length))
+ try:
+ self._check_error(
+ lib.olm_pickle_account(self._account,
+ ffi.from_buffer(byte_key),
+ len(byte_key),
+ pickle_buffer,
+ pickle_length))
+ finally:
+ # zero out copies of the passphrase
+ for i in range(0, len(byte_key)):
+ byte_key[i] = 0
+
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
@@ -143,15 +151,22 @@ class Account(object):
if not pickle:
raise ValueError("Pickle can't be empty")
- byte_key = bytes(passphrase, "utf-8") if passphrase else b""
- key_buffer = ffi.new("char[]", byte_key)
+ byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
+ # copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls)
- ret = lib.olm_unpickle_account(obj._account, key_buffer, len(byte_key),
- pickle_buffer, len(pickle))
- obj._check_error(ret)
+ try:
+ ret = lib.olm_unpickle_account(obj._account,
+ ffi.from_buffer(byte_key),
+ len(byte_key),
+ pickle_buffer,
+ len(pickle))
+ obj._check_error(ret)
+ finally:
+ for i in range(0, len(byte_key)):
+ byte_key[i] = 0
return obj
@@ -178,14 +193,21 @@ class Account(object):
Args:
message(str): The message to sign.
"""
- bytes_message = to_bytes(message)
+ bytes_message = to_bytearray(message)
out_length = lib.olm_account_signature_length(self._account)
- message_buffer = ffi.new("char[]", bytes_message)
out_buffer = ffi.new("char[]", out_length)
- self._check_error(
- lib.olm_account_sign(self._account, message_buffer,
- len(bytes_message), out_buffer, out_length))
+ try:
+ self._check_error(
+ lib.olm_account_sign(self._account,
+ ffi.from_buffer(bytes_message),
+ len(bytes_message), out_buffer,
+ out_length))
+ finally:
+ # clear out copies of the message, which may be plaintext
+ if bytes_message is not message:
+ for i in range(0, len(bytes_message)):
+ bytes_message[i] = 0
return bytes_to_native_str(ffi.unpack(out_buffer, out_length))
@@ -214,10 +236,10 @@ class Account(object):
random_length = lib.olm_account_generate_one_time_keys_random_length(
self._account, count)
random = URANDOM(random_length)
- random_buffer = ffi.new("char[]", random)
+
self._check_error(
lib.olm_account_generate_one_time_keys(
- self._account, count, random_buffer, random_length))
+ self._account, count, ffi.from_buffer(random), random_length))
@property
def one_time_keys(self):
diff --git a/python/olm/group_session.py b/python/olm/group_session.py
index bbb5e56..814ce27 100644
--- a/python/olm/group_session.py
+++ b/python/olm/group_session.py
@@ -33,7 +33,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
-from ._compat import URANDOM, to_bytes
+from ._compat import URANDOM, to_bytearray, to_bytes
from ._finalize import track_for_finalization
@@ -78,12 +78,17 @@ class InboundGroupSession(object):
if False: # pragma: no cover
self._session = self._session # type: ffi.cdata
- byte_session_key = to_bytes(session_key)
-
- key_buffer = ffi.new("char[]", byte_session_key)
- ret = lib.olm_init_inbound_group_session(
- self._session, key_buffer, len(byte_session_key)
- )
+ byte_session_key = to_bytearray(session_key)
+
+ try:
+ ret = lib.olm_init_inbound_group_session(
+ self._session,
+ ffi.from_buffer(byte_session_key), len(byte_session_key)
+ )
+ finally:
+ if byte_session_key is not session_key:
+ for i in range(0, len(byte_session_key)):
+ byte_session_key[i] = 0
self._check_error(ret)
def pickle(self, passphrase=""):
@@ -98,19 +103,23 @@ class InboundGroupSession(object):
passphrase(str, optional): The passphrase to be used to encrypt
the session.
"""
- byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
+ byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
- passphrase_buffer = ffi.new("char[]", byte_passphrase)
pickle_length = lib.olm_pickle_inbound_group_session_length(
self._session)
pickle_buffer = ffi.new("char[]", pickle_length)
- ret = lib.olm_pickle_inbound_group_session(
- self._session, passphrase_buffer, len(byte_passphrase),
- pickle_buffer, pickle_length
- )
-
- self._check_error(ret)
+ try:
+ ret = lib.olm_pickle_inbound_group_session(
+ self._session,
+ ffi.from_buffer(byte_passphrase), len(byte_passphrase),
+ pickle_buffer, pickle_length
+ )
+ self._check_error(ret)
+ finally:
+ # clear out copies of the passphrase
+ for i in range(0, len(byte_passphrase)):
+ byte_passphrase[i] = 0
return ffi.unpack(pickle_buffer, pickle_length)
@@ -135,20 +144,25 @@ class InboundGroupSession(object):
if not pickle:
raise ValueError("Pickle can't be empty")
- byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
- passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
+ # copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls)
- ret = lib.olm_unpickle_inbound_group_session(
- obj._session,
- passphrase_buffer,
- len(byte_passphrase),
- pickle_buffer,
- len(pickle)
- )
- obj._check_error(ret)
+ try:
+ ret = lib.olm_unpickle_inbound_group_session(
+ obj._session,
+ ffi.from_buffer(byte_passphrase),
+ len(byte_passphrase),
+ pickle_buffer,
+ len(pickle)
+ )
+ obj._check_error(ret)
+ finally:
+ # clear out copies of the passphrase
+ for i in range(0, len(byte_passphrase)):
+ byte_passphrase[i] = 0
return obj
@@ -189,12 +203,15 @@ class InboundGroupSession(object):
byte_ciphertext = to_bytes(ciphertext)
+ # copy because max_plaintext_length will destroy the buffer
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
self._session, ciphertext_buffer, len(byte_ciphertext)
)
+ self._check_error(max_plaintext_length)
plaintext_buffer = ffi.new("char[]", max_plaintext_length)
+ # copy because max_plaintext_length will destroy the buffer
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
message_index = ffi.new("uint32_t*")
@@ -206,10 +223,15 @@ class InboundGroupSession(object):
self._check_error(plaintext_length)
- return bytes_to_native_str(ffi.unpack(
+ plaintext = bytes_to_native_str(ffi.unpack(
plaintext_buffer,
plaintext_length
- )), message_index[0]
+ ))
+
+ # clear out copies of the plaintext
+ lib.memset(plaintext_buffer, 0, max_plaintext_length)
+
+ return plaintext, message_index[0]
@property
def id(self):
@@ -281,15 +303,19 @@ class InboundGroupSession(object):
"""
obj = cls.__new__(cls)
- byte_session_key = to_bytes(session_key)
+ byte_session_key = to_bytearray(session_key)
- key_buffer = ffi.new("char[]", byte_session_key)
- ret = lib.olm_import_inbound_group_session(
- obj._session,
- key_buffer,
- len(byte_session_key)
- )
- obj._check_error(ret)
+ try:
+ ret = lib.olm_import_inbound_group_session(
+ obj._session,
+ ffi.from_buffer(byte_session_key),
+ len(byte_session_key)
+ )
+ obj._check_error(ret)
+ finally:
+ if byte_session_key is not session_key:
+ for i in range(0, len(byte_session_key)):
+ byte_session_key[i] = 0
return obj
@@ -323,10 +349,9 @@ class OutboundGroupSession(object):
self._session
)
random = URANDOM(random_length)
- random_buffer = ffi.new("char[]", random)
ret = lib.olm_init_outbound_group_session(
- self._session, random_buffer, random_length
+ self._session, ffi.from_buffer(random), random_length
)
self._check_error(ret)
@@ -353,17 +378,23 @@ class OutboundGroupSession(object):
passphrase(str, optional): The passphrase to be used to encrypt
the session.
"""
- byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
- passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
pickle_length = lib.olm_pickle_outbound_group_session_length(
self._session)
pickle_buffer = ffi.new("char[]", pickle_length)
- ret = lib.olm_pickle_outbound_group_session(
- self._session, passphrase_buffer, len(byte_passphrase),
- pickle_buffer, pickle_length
- )
- self._check_error(ret)
+ try:
+ ret = lib.olm_pickle_outbound_group_session(
+ self._session,
+ ffi.from_buffer(byte_passphrase), len(byte_passphrase),
+ pickle_buffer, pickle_length
+ )
+ self._check_error(ret)
+ finally:
+ # clear out copies of the passphrase
+ for i in range(0, len(byte_passphrase)):
+ byte_passphrase[i] = 0
+
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
@@ -387,20 +418,25 @@ class OutboundGroupSession(object):
if not pickle:
raise ValueError("Pickle can't be empty")
- byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
- passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
+ # copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle)
obj = cls.__new__(cls)
- ret = lib.olm_unpickle_outbound_group_session(
- obj._session,
- passphrase_buffer,
- len(byte_passphrase),
- pickle_buffer,
- len(pickle)
- )
- obj._check_error(ret)
+ try:
+ ret = lib.olm_unpickle_outbound_group_session(
+ obj._session,
+ ffi.from_buffer(byte_passphrase),
+ len(byte_passphrase),
+ pickle_buffer,
+ len(pickle)
+ )
+ obj._check_error(ret)
+ finally:
+ # clear out copies of the passphrase
+ for i in range(0, len(byte_passphrase)):
+ byte_passphrase[i] = 0
return obj
@@ -414,21 +450,26 @@ class OutboundGroupSession(object):
plaintext(str): A string that will be encrypted using the group
session.
"""
- byte_plaintext = to_bytes(plaintext)
+ byte_plaintext = to_bytearray(plaintext)
message_length = lib.olm_group_encrypt_message_length(
self._session, len(byte_plaintext)
)
message_buffer = ffi.new("char[]", message_length)
- plaintext_buffer = ffi.new("char[]", byte_plaintext)
+ try:
+ ret = lib.olm_group_encrypt(
+ self._session,
+ ffi.from_buffer(byte_plaintext), len(byte_plaintext),
+ message_buffer, message_length,
+ )
+ self._check_error(ret)
+ finally:
+ # clear out copies of plaintext
+ if byte_plaintext is not plaintext:
+ for i in range(0, len(byte_plaintext)):
+ byte_plaintext[i] = 0
- ret = lib.olm_group_encrypt(
- self._session,
- plaintext_buffer, len(byte_plaintext),
- message_buffer, message_length,
- )
- self._check_error(ret)
return bytes_to_native_str(ffi.unpack(message_buffer, message_length))
@property
diff --git a/python/olm/session.py b/python/olm/session.py
index b123e8a..cba9be0 100644
--- a/python/olm/session.py
+++ b/python/olm/session.py
@@ -40,7 +40,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
-from ._compat import URANDOM, to_bytes
+from ._compat import URANDOM, to_bytearray, to_bytes
from ._finalize import track_for_finalization
# This is imported only for type checking purposes
@@ -164,15 +164,22 @@ class Session(object):
passphrase(str, optional): The passphrase to be used to encrypt
the session.
"""
- byte_key = bytes(passphrase, "utf-8") if passphrase else b""
- key_buffer = ffi.new("char[]", byte_key)
+ byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
pickle_length = lib.olm_pickle_session_length(self._session)
pickle_buffer = ffi.new("char[]", pickle_length)
- self._check_error(
- lib.olm_pickle_session(self._session, key_buffer, len(byte_key),
- pickle_buffer, pickle_length))
+ try:
+ self._check_error(
+ lib.olm_pickle_session(self._session,
+ ffi.from_buffer(byte_key),
+ len(byte_key),
+ pickle_buffer, pickle_length))
+ finally:
+ # clear out copies of the passphrase
+ for i in range(0, len(byte_key)):
+ byte_key[i] = 0
+
return ffi.unpack(pickle_buffer, pickle_length)
@classmethod
@@ -196,16 +203,23 @@ class Session(object):
if not pickle:
raise ValueError("Pickle can't be empty")
- byte_key = bytes(passphrase, "utf-8") if passphrase else b""
- key_buffer = ffi.new("char[]", byte_key)
+ byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
+ # copy because unpickle will destroy the buffer
pickle_buffer = ffi.new("char[]", pickle)
session = cls.__new__(cls)
- ret = lib.olm_unpickle_session(session._session, key_buffer,
- len(byte_key), pickle_buffer,
- len(pickle))
- session._check_error(ret)
+ try:
+ ret = lib.olm_unpickle_session(session._session,
+ ffi.from_buffer(byte_key),
+ len(byte_key),
+ pickle_buffer,
+ len(pickle))
+ session._check_error(ret)
+ finally:
+ # clear out copies of the passphrase
+ for i in range(0, len(byte_key)):
+ byte_key[i] = 0
return session
@@ -217,29 +231,32 @@ class Session(object):
Args:
plaintext(str): The plaintext message that will be encrypted.
"""
- byte_plaintext = to_bytes(plaintext)
+ byte_plaintext = to_bytearray(plaintext)
r_length = lib.olm_encrypt_random_length(self._session)
random = URANDOM(r_length)
- random_buffer = ffi.new("char[]", random)
- message_type = lib.olm_encrypt_message_type(self._session)
+ try:
+ message_type = lib.olm_encrypt_message_type(self._session)
- self._check_error(message_type)
-
- ciphertext_length = lib.olm_encrypt_message_length(
- self._session, len(plaintext)
- )
- ciphertext_buffer = ffi.new("char[]", ciphertext_length)
+ self._check_error(message_type)
- plaintext_buffer = ffi.new("char[]", byte_plaintext)
+ ciphertext_length = lib.olm_encrypt_message_length(
+ self._session, len(byte_plaintext)
+ )
+ ciphertext_buffer = ffi.new("char[]", ciphertext_length)
- self._check_error(lib.olm_encrypt(
- self._session,
- plaintext_buffer, len(byte_plaintext),
- random_buffer, r_length,
- ciphertext_buffer, ciphertext_length,
- ))
+ self._check_error(lib.olm_encrypt(
+ self._session,
+ ffi.from_buffer(byte_plaintext), len(byte_plaintext),
+ ffi.from_buffer(random), r_length,
+ ciphertext_buffer, ciphertext_length,
+ ))
+ finally:
+ # clear out copies of plaintext
+ if byte_plaintext is not plaintext:
+ for i in range(0, len(byte_plaintext)):
+ byte_plaintext[i] = 0
if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY:
return OlmPreKeyMessage(
@@ -274,22 +291,34 @@ class Session(object):
raise ValueError("Ciphertext can't be empty")
byte_ciphertext = to_bytes(message.ciphertext)
+ # make a copy the ciphertext buffer, because
+ # olm_decrypt_max_plaintext_length wants to destroy something
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
max_plaintext_length = lib.olm_decrypt_max_plaintext_length(
self._session, message.message_type, ciphertext_buffer,
len(byte_ciphertext)
)
+ self._check_error(max_plaintext_length)
plaintext_buffer = ffi.new("char[]", max_plaintext_length)
+
+ # make a copy the ciphertext buffer, because
+ # olm_decrypt_max_plaintext_length wants to destroy something
ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
plaintext_length = lib.olm_decrypt(
- self._session, message.message_type, ciphertext_buffer,
- len(byte_ciphertext), plaintext_buffer, max_plaintext_length
+ self._session, message.message_type,
+ ciphertext_buffer, len(byte_ciphertext),
+ plaintext_buffer, max_plaintext_length
)
self._check_error(plaintext_length)
- return bytes_to_native_str(
+ plaintext = bytes_to_native_str(
ffi.unpack(plaintext_buffer, plaintext_length))
+ # clear out copies of the plaintext
+ lib.memset(plaintext_buffer, 0, max_plaintext_length)
+
+ return plaintext
+
@property
def id(self):
# type: () -> str
@@ -331,16 +360,16 @@ class Session(object):
ret = None
byte_ciphertext = to_bytes(message.ciphertext)
-
+ # make a copy, because olm_matches_inbound_session(_from) will distroy
+ # it
message_buffer = ffi.new("char[]", byte_ciphertext)
if identity_key:
byte_id_key = to_bytes(identity_key)
- identity_key_buffer = ffi.new("char[]", byte_id_key)
ret = lib.olm_matches_inbound_session_from(
self._session,
- identity_key_buffer, len(byte_id_key),
+ ffi.from_buffer(byte_id_key), len(byte_id_key),
message_buffer, len(byte_ciphertext)
)
@@ -447,14 +476,11 @@ class OutboundSession(Session):
self._session)
random = URANDOM(session_random_length)
- random_buffer = ffi.new("char[]", random)
- identity_key_buffer = ffi.new("char[]", byte_id_key)
- one_time_key_buffer = ffi.new("char[]", byte_one_time)
self._check_error(lib.olm_create_outbound_session(
self._session,
account._account,
- identity_key_buffer, len(byte_id_key),
- one_time_key_buffer, len(byte_one_time),
- random_buffer, session_random_length
+ ffi.from_buffer(byte_id_key), len(byte_id_key),
+ ffi.from_buffer(byte_one_time), len(byte_one_time),
+ ffi.from_buffer(random), session_random_length
))
diff --git a/python/olm/utility.py b/python/olm/utility.py
index 1c5c41d..0a64128 100644
--- a/python/olm/utility.py
+++ b/python/olm/utility.py
@@ -36,7 +36,7 @@ from typing import AnyStr, Type
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
-from ._compat import to_bytes
+from ._compat import to_bytearray, to_bytes
from ._finalize import track_for_finalization
@@ -80,13 +80,20 @@ class _Utility(object):
cls._allocate()
byte_key = to_bytes(key)
- byte_message = to_bytes(message)
+ byte_message = to_bytearray(message)
byte_signature = to_bytes(signature)
- cls._check_error(
- lib.olm_ed25519_verify(cls._utility, byte_key, len(byte_key),
- byte_message, len(byte_message),
- byte_signature, len(byte_signature)))
+ try:
+ cls._check_error(
+ lib.olm_ed25519_verify(cls._utility, byte_key, len(byte_key),
+ ffi.from_buffer(byte_message),
+ len(byte_message),
+ byte_signature, len(byte_signature)))
+ finally:
+ # clear out copies of the message, which may be a plaintext
+ if byte_message is not message:
+ for i in range(0, len(byte_message)):
+ byte_message[i] = 0
def ed25519_verify(key, message, signature):