aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/Makefile12
-rw-r--r--python/olm/__init__.py7
-rw-r--r--python/olm/pk.py346
-rw-r--r--python/olm_build.py4
-rw-r--r--python/tests/pk_test.py49
5 files changed, 415 insertions, 3 deletions
diff --git a/python/Makefile b/python/Makefile
index 5da703a..546de13 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -1,15 +1,21 @@
all: olm-python2 olm-python3
-include/olm/olm.h: ../include/olm/olm.h ../include/olm/inbound_group_session.h ../include/olm/outbound_group_session.h
+OLM_HEADERS = ../include/olm/olm.h ../include/olm/inbound_group_session.h \
+ ../include/olm/outbound_group_session.h \
+
+include/olm/olm.h: $(OLM_HEADERS)
mkdir -p include/olm
$(CPP) -I dummy -I ../include ../include/olm/olm.h -o include/olm/olm.h
# add memset to the header so that we can use it to clear buffers
echo 'void *memset(void *s, int c, size_t n);' >> include/olm/olm.h
-olm-python2: include/olm/olm.h
+include/olm/pk.h: include/olm/olm.h ../include/olm/pk.h
+ $(CPP) -I dummy -I ../include ../include/olm/pk.h -o include/olm/pk.h
+
+olm-python2: include/olm/olm.h include/olm/pk.h
DEVELOP=$(DEVELOP) python2 setup.py build
-olm-python3: include/olm/olm.h
+olm-python3: include/olm/olm.h include/olm/pk.h
DEVELOP=$(DEVELOP) python3 setup.py build
install: install-python2 install-python3
diff --git a/python/olm/__init__.py b/python/olm/__init__.py
index 015c4f8..7b7423b 100644
--- a/python/olm/__init__.py
+++ b/python/olm/__init__.py
@@ -36,3 +36,10 @@ from .group_session import (
OutboundGroupSession,
OlmGroupSessionError
)
+from .pk import (
+ PkMessage,
+ PkEncryption,
+ PkDecryption,
+ PkEncryptionError,
+ PkDecryptionError
+)
diff --git a/python/olm/pk.py b/python/olm/pk.py
new file mode 100644
index 0000000..b67d5a4
--- /dev/null
+++ b/python/olm/pk.py
@@ -0,0 +1,346 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""libolm PK module.
+
+This module contains bindings to the PK part of the Olm library.
+It contains two classes PkDecryption and PkEncryption that are used to
+establish an encrypted communication channel using public key encryption.
+
+Examples:
+ >>> decryption = PkDecryption()
+ >>> encryption = PkEncryption(decryption.public_key)
+ >>> plaintext = "It's a secret to everybody."
+ >>> message = encryption.encrypt(plaintext)
+ >>> decrypted_plaintext = decryption.decrypt(message)
+
+"""
+
+from builtins import super
+from typing import AnyStr, Type
+from future.utils import bytes_to_native_str
+
+from _libolm import ffi, lib # type: ignore
+from ._finalize import track_for_finalization
+from ._compat import URANDOM, to_bytearray
+
+
+class PkEncryptionError(Exception):
+ """libolm Pk encryption exception."""
+
+
+class PkDecryptionError(Exception):
+ """libolm Pk decryption exception."""
+
+
+def _clear_pk_encryption(pk_struct):
+ lib.olm_clear_pk_encryption(pk_struct)
+
+
+class PkMessage(object):
+ """A PK encrypted message."""
+
+ def __init__(self, ephemeral_key, mac, ciphertext):
+ # type: (str, str, str) -> None
+ """Create a new PK encrypted message.
+
+ Args:
+ ephemeral_key(str): the public part of the ephemeral key
+ used (together with the recipient's key) to generate a symmetric
+ encryption key.
+ mac(str): Message Authentication Code of the encrypted message
+ ciphertext(str): The cipher text of the encrypted message
+ """
+ self.ephemeral_key = ephemeral_key
+ self.mac = mac
+ self.ciphertext = ciphertext
+
+
+class PkEncryption(object):
+ """PkEncryption class.
+
+ Represents the decryption part of a PK encrypted channel.
+ """
+
+ def __init__(self, recipient_key):
+ # type: (AnyStr) -> None
+ """Create a new PK encryption object.
+
+ Args:
+ recipient_key(str): a public key that will be used for encryption
+ """
+ if not recipient_key:
+ raise ValueError("Recipient key can't be empty")
+
+ self._buf = ffi.new("char[]", lib.olm_pk_encryption_size())
+ self._pk_encryption = lib.olm_pk_encryption(self._buf)
+ track_for_finalization(self, self._pk_encryption, _clear_pk_encryption)
+
+ byte_key = to_bytearray(recipient_key)
+ lib.olm_pk_encryption_set_recipient_key(
+ self._pk_encryption,
+ ffi.from_buffer(byte_key),
+ len(byte_key)
+ )
+
+ # clear out copies of the key
+ if byte_key is not recipient_key: # pragma: no cover
+ for i in range(0, len(byte_key)):
+ byte_key[i] = 0
+
+ def _check_error(self, ret): # pragma: no cover
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ last_error = bytes_to_native_str(
+ ffi.string(lib.olm_pk_encryption_last_error(self._pk_encryption)))
+
+ raise PkEncryptionError(last_error)
+
+ def encrypt(self, plaintext):
+ # type: (AnyStr) -> PkMessage
+ """Encrypt a message.
+
+ Returns the encrypted PkMessage.
+
+ Args:
+ plaintext(str): A string that will be encrypted using the
+ PkEncryption object.
+ """
+ byte_plaintext = to_bytearray(plaintext)
+
+ r_length = lib.olm_pk_encrypt_random_length(self._pk_encryption)
+ random = URANDOM(r_length)
+ random_buffer = ffi.new("char[]", random)
+
+ ciphertext_length = lib.olm_pk_ciphertext_length(
+ self._pk_encryption, len(byte_plaintext)
+ )
+ ciphertext = ffi.new("char[]", ciphertext_length)
+
+ mac_length = lib.olm_pk_mac_length(self._pk_encryption)
+ mac = ffi.new("char[]", mac_length)
+
+ ephemeral_key_size = lib.olm_pk_key_length()
+ ephemeral_key = ffi.new("char[]", ephemeral_key_size)
+
+ ret = lib.olm_pk_encrypt(
+ self._pk_encryption,
+ ffi.from_buffer(byte_plaintext), len(byte_plaintext),
+ ciphertext, ciphertext_length,
+ mac, mac_length,
+ ephemeral_key, ephemeral_key_size,
+ random_buffer, r_length
+ )
+
+ try:
+ self._check_error(ret)
+ finally: # pragma: no cover
+ # clear out copies of plaintext
+ if byte_plaintext is not plaintext:
+ for i in range(0, len(byte_plaintext)):
+ byte_plaintext[i] = 0
+
+ message = PkMessage(
+ bytes_to_native_str(
+ ffi.unpack(ephemeral_key, ephemeral_key_size)),
+ bytes_to_native_str(
+ ffi.unpack(mac, mac_length)),
+ bytes_to_native_str(
+ ffi.unpack(ciphertext, ciphertext_length))
+ )
+ return message
+
+
+def _clear_pk_decryption(pk_struct):
+ lib.olm_clear_pk_decryption(pk_struct)
+
+
+class PkDecryption(object):
+ """PkDecryption class.
+
+ Represents the decryption part of a PK encrypted channel.
+
+ Attributes:
+ public_key (str): The public key of the PkDecryption object, can be
+ shared and used to create a PkEncryption object.
+
+ """
+
+ def __new__(cls):
+ # type: (Type[PkDecryption]) -> PkDecryption
+ obj = super().__new__(cls)
+ obj._buf = ffi.new("char[]", lib.olm_pk_decryption_size())
+ obj._pk_decryption = lib.olm_pk_decryption(obj._buf)
+ obj.public_key = None
+ track_for_finalization(obj, obj._pk_decryption, _clear_pk_decryption)
+ return obj
+
+ def __init__(self):
+ if False: # pragma: no cover
+ self._pk_decryption = self._pk_decryption # type: ffi.cdata
+
+ random_length = lib.olm_pk_private_key_length()
+ random = URANDOM(random_length)
+ random_buffer = ffi.new("char[]", random)
+
+ key_length = lib.olm_pk_key_length()
+ key_buffer = ffi.new("char[]", key_length)
+
+ ret = lib.olm_pk_key_from_private(
+ self._pk_decryption,
+ key_buffer, key_length,
+ random_buffer, random_length
+ )
+ self._check_error(ret)
+ self.public_key = bytes_to_native_str(ffi.unpack(
+ key_buffer,
+ key_length
+ ))
+
+ def _check_error(self, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ last_error = bytes_to_native_str(
+ ffi.string(lib.olm_pk_decryption_last_error(self._pk_decryption)))
+
+ raise PkDecryptionError(last_error)
+
+ def pickle(self, passphrase=""):
+ # type: (str) -> bytes
+ """Store a PkDecryption object.
+
+ Stores a PkDecryption object as a base64 string. Encrypts the object
+ using the supplied passphrase. Returns a byte object containing the
+ base64 encoded string of the pickled session.
+
+ Args:
+ passphrase(str, optional): The passphrase to be used to encrypt
+ the object.
+ """
+ byte_key = to_bytearray(passphrase)
+
+ pickle_length = lib.olm_pickle_pk_decryption_length(
+ self._pk_decryption
+ )
+ pickle_buffer = ffi.new("char[]", pickle_length)
+
+ ret = lib.olm_pickle_pk_decryption(
+ self._pk_decryption,
+ ffi.from_buffer(byte_key), len(byte_key),
+ pickle_buffer, pickle_length
+ )
+ try:
+ self._check_error(ret)
+ finally:
+ # zero out copies of the passphrase
+ for i in range(0, len(byte_key)):
+ byte_key[i] = 0
+
+ return ffi.unpack(pickle_buffer, pickle_length)
+
+ @classmethod
+ def from_pickle(cls, pickle, passphrase=""):
+ # types: (bytes, str) -> PkDecryption
+ """Restore a previously stored PkDecryption object.
+
+ Creates a PkDecryption object from a pickled base64 string. Decrypts
+ the pickled object using the supplied passphrase.
+ Raises PkDecryptionError on failure. If the passphrase
+ doesn't match the one used to encrypt the session then the error
+ message for the exception will be "BAD_ACCOUNT_KEY". If the base64
+ couldn't be decoded then the error message will be "INVALID_BASE64".
+
+ Args:
+ pickle(bytes): Base64 encoded byte string containing the pickled
+ PkDecryption object
+ passphrase(str, optional): The passphrase used to encrypt the
+ object
+ """
+ if not pickle:
+ raise ValueError("Pickle can't be empty")
+
+ byte_key = to_bytearray(passphrase)
+ pickle_buffer = ffi.new("char[]", pickle)
+
+ pubkey_length = lib.olm_pk_key_length()
+ pubkey_buffer = ffi.new("char[]", pubkey_length)
+
+ obj = cls.__new__(cls)
+
+ ret = lib.olm_unpickle_pk_decryption(
+ obj._pk_decryption,
+ ffi.from_buffer(byte_key), len(byte_key),
+ pickle_buffer, len(pickle),
+ pubkey_buffer, pubkey_length)
+
+ try:
+ obj._check_error(ret)
+ finally:
+ for i in range(0, len(byte_key)):
+ byte_key[i] = 0
+
+ obj.public_key = bytes_to_native_str(ffi.unpack(
+ pubkey_buffer,
+ pubkey_length
+ ))
+
+ return obj
+
+ def decrypt(self, message):
+ # type (PkMessage) -> str
+ """Decrypt a previously encrypted Pk message.
+
+ Returns the decrypted plaintext.
+ Raises PkDecryptionError on failure.
+
+ Args:
+ message(PkMessage): the pk message to decrypt.
+ """
+ ephemeral_key = to_bytearray(message.ephemeral_key)
+ ephemeral_key_size = len(ephemeral_key)
+
+ mac = to_bytearray(message.mac)
+ mac_length = len(mac)
+
+ ciphertext = to_bytearray(message.ciphertext)
+ ciphertext_length = len(ciphertext)
+
+ max_plaintext_length = lib.olm_pk_max_plaintext_length(
+ self._pk_decryption,
+ ciphertext_length
+ )
+ plaintext_buffer = ffi.new("char[]", max_plaintext_length)
+
+ ret = lib.olm_pk_decrypt(
+ self._pk_decryption,
+ ffi.from_buffer(ephemeral_key), ephemeral_key_size,
+ ffi.from_buffer(mac), mac_length,
+ ffi.from_buffer(ciphertext), ciphertext_length,
+ plaintext_buffer, max_plaintext_length)
+ self._check_error(ret)
+
+ plaintext = (ffi.unpack(
+ plaintext_buffer,
+ ret
+ ))
+
+ # clear out copies of the plaintext
+ lib.memset(plaintext_buffer, 0, max_plaintext_length)
+
+ return bytes_to_native_str(plaintext)
diff --git a/python/olm_build.py b/python/olm_build.py
index ee836d8..d3eda79 100644
--- a/python/olm_build.py
+++ b/python/olm_build.py
@@ -39,6 +39,7 @@ ffibuilder.set_source(
#include <olm/olm.h>
#include <olm/inbound_group_session.h>
#include <olm/outbound_group_session.h>
+ #include <olm/pk.h>
""",
libraries=["olm"],
extra_compile_args=compile_args,
@@ -47,5 +48,8 @@ ffibuilder.set_source(
with open(os.path.join(PATH, "include/olm/olm.h")) as f:
ffibuilder.cdef(f.read(), override=True)
+with open(os.path.join(PATH, "include/olm/pk.h")) as f:
+ ffibuilder.cdef(f.read(), override=True)
+
if __name__ == "__main__":
ffibuilder.compile(verbose=True)
diff --git a/python/tests/pk_test.py b/python/tests/pk_test.py
new file mode 100644
index 0000000..f2aa147
--- /dev/null
+++ b/python/tests/pk_test.py
@@ -0,0 +1,49 @@
+import pytest
+
+from olm import PkDecryption, PkDecryptionError, PkEncryption
+
+
+class TestClass(object):
+ def test_invalid_encryption(self):
+ with pytest.raises(ValueError):
+ PkEncryption("")
+
+ def test_decrytion(self):
+ decryption = PkDecryption()
+ encryption = PkEncryption(decryption.public_key)
+ plaintext = "It's a secret to everybody."
+ message = encryption.encrypt(plaintext)
+ decrypted_plaintext = decryption.decrypt(message)
+ isinstance(decrypted_plaintext, str)
+ assert plaintext == decrypted_plaintext
+
+ def test_invalid_decrytion(self):
+ decryption = PkDecryption()
+ encryption = PkEncryption(decryption.public_key)
+ plaintext = "It's a secret to everybody."
+ message = encryption.encrypt(plaintext)
+ message.ephemeral_key = "?"
+ with pytest.raises(PkDecryptionError):
+ decryption.decrypt(message)
+
+ def test_pickling(self):
+ decryption = PkDecryption()
+ encryption = PkEncryption(decryption.public_key)
+ plaintext = "It's a secret to everybody."
+ message = encryption.encrypt(plaintext)
+
+ pickle = decryption.pickle()
+ unpickled = PkDecryption.from_pickle(pickle)
+ decrypted_plaintext = unpickled.decrypt(message)
+ assert plaintext == decrypted_plaintext
+
+ def test_invalid_unpickling(self):
+ with pytest.raises(ValueError):
+ PkDecryption.from_pickle("")
+
+ def test_invalid_pass_pickling(self):
+ decryption = PkDecryption()
+ pickle = decryption.pickle("Secret")
+
+ with pytest.raises(PkDecryptionError):
+ PkDecryption.from_pickle(pickle, "Not secret")