aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/Makefile3
-rw-r--r--python/olm/_compat.py21
-rw-r--r--python/olm/group_session.py21
-rw-r--r--python/olm/pk.py15
-rw-r--r--python/olm/sas.py4
-rw-r--r--python/olm/session.py21
-rw-r--r--python/olm/utility.py1
-rw-r--r--python/tests/group_session_test.py14
-rw-r--r--python/tests/pk_test.py8
-rw-r--r--python/tests/session_test.py9
10 files changed, 98 insertions, 19 deletions
diff --git a/python/Makefile b/python/Makefile
index e4d0611..16f9823 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -43,6 +43,9 @@ test: olm-python2 olm-python3
PYTHONPATH=install-temp/3 python3 -m pytest --cov --cov-branch --benchmark-disable
rm -rf install-temp
+isort:
+ isort -y -p olm
+
clean:
rm -rf python_olm.egg-info/ dist/ __pycache__/
rm -rf *.so _libolm.o
diff --git a/python/olm/_compat.py b/python/olm/_compat.py
index 91e4d1b..2ceaa33 100644
--- a/python/olm/_compat.py
+++ b/python/olm/_compat.py
@@ -44,3 +44,24 @@ def to_bytes(string):
return bytes(string, "utf-8")
raise TypeError("Invalid type {}".format(type(string)))
+
+
+def to_unicode_str(byte_string, errors="replace"):
+ """Turn a byte string into a unicode string.
+
+ Should be used everywhere where the input byte string might not be trusted
+ and may contain invalid unicode values.
+
+ Args:
+ byte_string (bytes): The bytestring that will be converted to a native
+ string.
+ errors (str, optional): The error handling scheme that should be used
+ to handle unicode decode errors. Can be one of "strict" (raise an
+ UnicodeDecodeError exception, "ignore" (remove the offending
+ characters), "replace" (replace the offending character with
+ U+FFFD), "xmlcharrefreplace" as well as any other name registered
+ with codecs.register_error that can handle UnicodeEncodeErrors.
+
+ Returns the decoded native string.
+ """
+ return byte_string.decode(encoding="utf-8", errors=errors)
diff --git a/python/olm/group_session.py b/python/olm/group_session.py
index 737d9ef..5068192 100644
--- a/python/olm/group_session.py
+++ b/python/olm/group_session.py
@@ -33,7 +33,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
-from ._compat import URANDOM, to_bytearray, to_bytes
+from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str
from ._finalize import track_for_finalization
@@ -176,8 +176,8 @@ class InboundGroupSession(object):
raise OlmGroupSessionError(last_error)
- def decrypt(self, ciphertext):
- # type: (AnyStr) -> Tuple[str, int]
+ def decrypt(self, ciphertext, unicode_errors="replace"):
+ # type: (AnyStr, str) -> Tuple[str, int]
"""Decrypt a message
Returns a tuple of the decrypted plain-text and the message index of
@@ -197,6 +197,13 @@ class InboundGroupSession(object):
Args:
ciphertext(str): Base64 encoded ciphertext containing the encrypted
message
+ unicode_errors(str, optional): The error handling scheme to use for
+ unicode decoding errors. The default is "replace" meaning that
+ the character that was unable to decode will be replaced with
+ the unicode replacement character (U+FFFD). Other possible
+ values are "strict", "ignore" and "xmlcharrefreplace" as well
+ as any other name registered with codecs.register_error that
+ can handle UnicodeEncodeErrors.
"""
if not ciphertext:
raise ValueError("Ciphertext can't be empty.")
@@ -223,10 +230,10 @@ class InboundGroupSession(object):
self._check_error(plaintext_length)
- plaintext = bytes_to_native_str(ffi.unpack(
- plaintext_buffer,
- plaintext_length
- ))
+ plaintext = to_unicode_str(
+ ffi.unpack(plaintext_buffer, plaintext_length),
+ errors=unicode_errors
+ )
# clear out copies of the plaintext
lib.memset(plaintext_buffer, 0, max_plaintext_length)
diff --git a/python/olm/pk.py b/python/olm/pk.py
index 193aba5..4352359 100644
--- a/python/olm/pk.py
+++ b/python/olm/pk.py
@@ -40,7 +40,7 @@ from future.utils import bytes_to_native_str
from _libolm import ffi, lib # type: ignore
-from ._compat import URANDOM, to_bytearray
+from ._compat import URANDOM, to_bytearray, to_unicode_str
from ._finalize import track_for_finalization
@@ -313,8 +313,8 @@ class PkDecryption(object):
return obj
- def decrypt(self, message):
- # type (PkMessage) -> str
+ def decrypt(self, message, unicode_errors="replace"):
+ # type (PkMessage, str) -> str
"""Decrypt a previously encrypted Pk message.
Returns the decrypted plaintext.
@@ -322,6 +322,13 @@ class PkDecryption(object):
Args:
message(PkMessage): the pk message to decrypt.
+ unicode_errors(str, optional): The error handling scheme to use for
+ unicode decoding errors. The default is "replace" meaning that
+ the character that was unable to decode will be replaced with
+ the unicode replacement character (U+FFFD). Other possible
+ values are "strict", "ignore" and "xmlcharrefreplace" as well
+ as any other name registered with codecs.register_error that
+ can handle UnicodeEncodeErrors.
"""
ephemeral_key = to_bytearray(message.ephemeral_key)
ephemeral_key_size = len(ephemeral_key)
@@ -354,7 +361,7 @@ class PkDecryption(object):
# clear out copies of the plaintext
lib.memset(plaintext_buffer, 0, max_plaintext_length)
- return bytes_to_native_str(plaintext)
+ return to_unicode_str(plaintext, errors=unicode_errors)
def _clear_pk_signing(pk_struct):
diff --git a/python/olm/sas.py b/python/olm/sas.py
index c12b7bc..bea1dd0 100644
--- a/python/olm/sas.py
+++ b/python/olm/sas.py
@@ -30,15 +30,15 @@ Examples:
"""
-from functools import wraps
from builtins import bytes
+from functools import wraps
from typing import Optional
from future.utils import bytes_to_native_str
from _libolm import ffi, lib
-from ._compat import URANDOM, to_bytes, to_bytearray
+from ._compat import URANDOM, to_bytearray, to_bytes
from ._finalize import track_for_finalization
diff --git a/python/olm/session.py b/python/olm/session.py
index cba9be0..636eb3d 100644
--- a/python/olm/session.py
+++ b/python/olm/session.py
@@ -40,7 +40,7 @@ from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
from _libolm import ffi, lib # type: ignore
-from ._compat import URANDOM, to_bytearray, to_bytes
+from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str
from ._finalize import track_for_finalization
# This is imported only for type checking purposes
@@ -273,8 +273,8 @@ class Session(object):
else: # pragma: no cover
raise ValueError("Unknown message type")
- def decrypt(self, message):
- # type: (_OlmMessage) -> str
+ def decrypt(self, message, unicode_errors="replace"):
+ # type: (_OlmMessage, str) -> str
"""Decrypts a message using the session. Returns the plaintext string
on success. Raises OlmSessionError on failure. If the base64 couldn't
be decoded then the error message will be "INVALID_BASE64". If the
@@ -285,7 +285,14 @@ class Session(object):
Args:
message(OlmMessage): The Olm message that will be decrypted. It can
- be either an OlmPreKeyMessage or an OlmMessage.
+ be either an OlmPreKeyMessage or an OlmMessage.
+ unicode_errors(str, optional): The error handling scheme to use for
+ unicode decoding errors. The default is "replace" meaning that
+ the character that was unable to decode will be replaced with
+ the unicode replacement character (U+FFFD). Other possible
+ values are "strict", "ignore" and "xmlcharrefreplace" as well
+ as any other name registered with codecs.register_error that
+ can handle UnicodeEncodeErrors.
"""
if not message.ciphertext:
raise ValueError("Ciphertext can't be empty")
@@ -311,8 +318,10 @@ class Session(object):
plaintext_buffer, max_plaintext_length
)
self._check_error(plaintext_length)
- plaintext = bytes_to_native_str(
- ffi.unpack(plaintext_buffer, plaintext_length))
+ plaintext = to_unicode_str(
+ ffi.unpack(plaintext_buffer, plaintext_length),
+ errors=unicode_errors
+ )
# clear out copies of the plaintext
lib.memset(plaintext_buffer, 0, max_plaintext_length)
diff --git a/python/olm/utility.py b/python/olm/utility.py
index 10d5ab4..bddef38 100644
--- a/python/olm/utility.py
+++ b/python/olm/utility.py
@@ -32,6 +32,7 @@ Examples:
# pylint: disable=redefined-builtin,unused-import
from typing import AnyStr, Type
+
from future.utils import bytes_to_native_str
# pylint: disable=no-name-in-module
diff --git a/python/tests/group_session_test.py b/python/tests/group_session_test.py
index c17e84f..4632a60 100644
--- a/python/tests/group_session_test.py
+++ b/python/tests/group_session_test.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
import pytest
from olm import InboundGroupSession, OlmGroupSessionError, OutboundGroupSession
@@ -112,3 +113,16 @@ class TestClass(object):
outbound = OutboundGroupSession()
inbound = InboundGroupSession(outbound.session_key)
del inbound
+
+ def test_invalid_unicode_decrypt(self):
+ outbound = OutboundGroupSession()
+ inbound = InboundGroupSession(outbound.session_key)
+
+ text = outbound.encrypt(b"\xed")
+ plaintext, _ = inbound.decrypt(text)
+
+ print(plaintext)
+ assert plaintext == u"�"
+
+ plaintext, _ = inbound.decrypt(text, "ignore")
+ assert plaintext == ""
diff --git a/python/tests/pk_test.py b/python/tests/pk_test.py
index fe3b4b6..ef87465 100644
--- a/python/tests/pk_test.py
+++ b/python/tests/pk_test.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
import pytest
from olm import (PkDecryption, PkDecryptionError, PkEncryption, PkSigning,
@@ -55,3 +56,10 @@ class TestClass(object):
message = "This statement is true"
signature = signing.sign(message)
ed25519_verify(signing.public_key, message, signature)
+
+ def test_invalid_unicode_decrypt(self):
+ decryption = PkDecryption()
+ encryption = PkEncryption(decryption.public_key)
+ message = encryption.encrypt(b"\xed")
+ plaintext = decryption.decrypt(message)
+ assert plaintext == u"�"
diff --git a/python/tests/session_test.py b/python/tests/session_test.py
index ab1c38b..b856585 100644
--- a/python/tests/session_test.py
+++ b/python/tests/session_test.py
@@ -1,3 +1,4 @@
+# -*- coding: utf-8 -*-
import pytest
from olm import (Account, InboundSession, OlmMessage, OlmPreKeyMessage,
@@ -141,3 +142,11 @@ class TestClass(object):
new_message = new_session.encrypt(plaintext)
assert bob_session.matches(new_message) is False
+
+ def test_invalid_unicode_decrypt(self):
+ alice, bob, session = self._create_session()
+ message = session.encrypt(b"\xed")
+
+ bob_session = InboundSession(bob, message)
+ plaintext = bob_session.decrypt(message)
+ assert plaintext == u"�"