diff options
author | dec05eba <dec05eba@protonmail.com> | 2020-11-05 01:45:06 +0100 |
---|---|---|
committer | dec05eba <dec05eba@protonmail.com> | 2020-11-05 01:45:06 +0100 |
commit | 2a8202e74846d191a321cca1202175af9db6107d (patch) | |
tree | a6f455caf07da1186851f343a237a4c4e4484f46 /python/olm | |
parent | 8efa0ec17d8c262f9c3fd7603e8074f74a053708 (diff) |
Diffstat (limited to 'python/olm')
-rw-r--r-- | python/olm/__init__.py | 48 | ||||
-rw-r--r-- | python/olm/__version__.py | 9 | ||||
-rw-r--r-- | python/olm/_compat.py | 67 | ||||
-rw-r--r-- | python/olm/_finalize.py | 65 | ||||
-rw-r--r-- | python/olm/account.py | 271 | ||||
-rw-r--r-- | python/olm/group_session.py | 532 | ||||
-rw-r--r-- | python/olm/pk.py | 461 | ||||
-rw-r--r-- | python/olm/sas.py | 245 | ||||
-rw-r--r-- | python/olm/session.py | 495 | ||||
-rw-r--r-- | python/olm/utility.py | 151 |
10 files changed, 0 insertions, 2344 deletions
diff --git a/python/olm/__init__.py b/python/olm/__init__.py deleted file mode 100644 index 26257a5..0000000 --- a/python/olm/__init__.py +++ /dev/null @@ -1,48 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -Olm Python bindings -~~~~~~~~~~~~~~~~~~~~~ -| This package implements python bindings for the libolm C library. -| © Copyright 2015-2017 by OpenMarket Ltd -| © Copyright 2018 by Damir Jelić -""" -from .utility import ed25519_verify, OlmVerifyError, OlmHashError, sha256 -from .account import Account, OlmAccountError -from .session import ( - Session, - InboundSession, - OutboundSession, - OlmSessionError, - OlmMessage, - OlmPreKeyMessage -) -from .group_session import ( - InboundGroupSession, - OutboundGroupSession, - OlmGroupSessionError -) -from .pk import ( - PkMessage, - PkEncryption, - PkDecryption, - PkSigning, - PkEncryptionError, - PkDecryptionError, - PkSigningError -) -from .sas import Sas, OlmSasError diff --git a/python/olm/__version__.py b/python/olm/__version__.py deleted file mode 100644 index 498f89f..0000000 --- a/python/olm/__version__.py +++ /dev/null @@ -1,9 +0,0 @@ -__title__ = "python-olm" -__description__ = ("python CFFI bindings for the olm " - "cryptographic ratchet library") -__url__ = "https://github.com/poljar/python-olm" -__version__ = "3.2.1" -__author__ = "Damir Jelić" -__author_email__ = "poljar@termina.org.uk" -__license__ = "Apache 2.0" -__copyright__ = "Copyright 2018-2019 Damir Jelić" diff --git a/python/olm/_compat.py b/python/olm/_compat.py deleted file mode 100644 index 2ceaa33..0000000 --- a/python/olm/_compat.py +++ /dev/null @@ -1,67 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from builtins import bytes, str -from typing import AnyStr - -try: - import secrets - URANDOM = secrets.token_bytes # pragma: no cover -except ImportError: # pragma: no cover - from os import urandom - URANDOM = urandom # type: ignore - - -def to_bytearray(string): - # type: (AnyStr) -> bytes - if isinstance(string, bytes): - return bytearray(string) - elif isinstance(string, str): - return bytearray(string, "utf-8") - - raise TypeError("Invalid type {}".format(type(string))) - - -def to_bytes(string): - # type: (AnyStr) -> bytes - if isinstance(string, bytes): - return string - elif isinstance(string, str): - return bytes(string, "utf-8") - - raise TypeError("Invalid type {}".format(type(string))) - - -def to_unicode_str(byte_string, errors="replace"): - """Turn a byte string into a unicode string. - - Should be used everywhere where the input byte string might not be trusted - and may contain invalid unicode values. - - Args: - byte_string (bytes): The bytestring that will be converted to a native - string. - errors (str, optional): The error handling scheme that should be used - to handle unicode decode errors. Can be one of "strict" (raise an - UnicodeDecodeError exception, "ignore" (remove the offending - characters), "replace" (replace the offending character with - U+FFFD), "xmlcharrefreplace" as well as any other name registered - with codecs.register_error that can handle UnicodeEncodeErrors. - - Returns the decoded native string. - """ - return byte_string.decode(encoding="utf-8", errors=errors) diff --git a/python/olm/_finalize.py b/python/olm/_finalize.py deleted file mode 100644 index 9f467bc..0000000 --- a/python/olm/_finalize.py +++ /dev/null @@ -1,65 +0,0 @@ -# The MIT License (MIT) -# Copyright (c) 2010 Benjamin Peterson <benjamin@python.org> - -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: - -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. - -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. -# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE -# OR OTHER DEALINGS IN THE SOFTWARE. - -"""Finalization with weakrefs - -This is designed for avoiding __del__. -""" -from __future__ import print_function - -import sys -import traceback -import weakref - -__author__ = "Benjamin Peterson <benjamin@python.org>" - - -class OwnerRef(weakref.ref): - """A simple weakref.ref subclass, so attributes can be added.""" - pass - - -def _run_finalizer(ref): - """Internal weakref callback to run finalizers""" - del _finalize_refs[id(ref)] - finalizer = ref.finalizer - item = ref.item - try: - finalizer(item) - except Exception: # pragma: no cover - print("Exception running {}:".format(finalizer), file=sys.stderr) - traceback.print_exc() - - -_finalize_refs = {} - - -def track_for_finalization(owner, item, finalizer): - """Register an object for finalization. - - ``owner`` is the the object which is responsible for ``item``. - ``finalizer`` will be called with ``item`` as its only argument when - ``owner`` is destroyed by the garbage collector. - """ - ref = OwnerRef(owner, _run_finalizer) - ref.item = item - ref.finalizer = finalizer - _finalize_refs[id(ref)] = ref diff --git a/python/olm/account.py b/python/olm/account.py deleted file mode 100644 index 8455655..0000000 --- a/python/olm/account.py +++ /dev/null @@ -1,271 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Account module. - -This module contains the account part of the Olm library. It contains a single -Account class which handles the creation of new accounts as well as the storing -and restoring of them. - -Examples: - >>> acc = Account() - >>> account.identity_keys() - >>> account.generate_one_time_keys(1) - -""" - -import json -# pylint: disable=redefined-builtin,unused-import -from builtins import bytes, super -from typing import AnyStr, Dict, Optional, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray -from ._finalize import track_for_finalization - -# This is imported only for type checking purposes -if False: - from .session import Session # pragma: no cover - - -def _clear_account(account): - # type: (ffi.cdata) -> None - lib.olm_clear_account(account) - - -class OlmAccountError(Exception): - """libolm Account error exception.""" - - -class Account(object): - """libolm Account class.""" - - def __new__(cls): - # type: (Type[Account]) -> Account - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_account_size()) - obj._account = lib.olm_account(obj._buf) - track_for_finalization(obj, obj._account, _clear_account) - return obj - - def __init__(self): - # type: () -> None - """Create a new Olm account. - - Creates a new account and its matching identity key pair. - - Raises OlmAccountError on failure. If there weren't enough random bytes - for the account creation the error message for the exception will be - NOT_ENOUGH_RANDOM. - """ - # This is needed to silence mypy not knowing the type of _account. - # There has to be a better way for this. - if False: # pragma: no cover - self._account = self._account # type: ffi.cdata - - random_length = lib.olm_create_account_random_length(self._account) - random = URANDOM(random_length) - - self._check_error( - lib.olm_create_account(self._account, ffi.from_buffer(random), - random_length)) - - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string((lib.olm_account_last_error(self._account)))) - - raise OlmAccountError(last_error) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an Olm account. - - Stores an account as a base64 string. Encrypts the account using the - supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled account. Raises OlmAccountError on - failure. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the account. - """ - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - - pickle_length = lib.olm_pickle_account_length(self._account) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - self._check_error( - lib.olm_pickle_account(self._account, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, - pickle_length)) - finally: - # zero out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> Account - """Load a previously stored olm account. - - Loads an account from a pickled base64-encoded string and returns an - Account object. Decrypts the account using the supplied passphrase. - Raises OlmAccountError on failure. If the passphrase doesn't match the - one used to encrypt the account then the error message for the - exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded - then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - account - passphrase(str, optional): The passphrase used to encrypt the - account. - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - obj = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_account(obj._account, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, - len(pickle)) - obj._check_error(ret) - finally: - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return obj - - @property - def identity_keys(self): - # type: () -> Dict[str, str] - """dict: Public part of the identity keys of the account.""" - out_length = lib.olm_account_identity_keys_length(self._account) - out_buffer = ffi.new("char[]", out_length) - - self._check_error( - lib.olm_account_identity_keys(self._account, out_buffer, - out_length)) - return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8")) - - def sign(self, message): - # type: (AnyStr) -> str - """Signs a message with this account. - - Signs a message with the private ed25519 identity key of this account. - Returns the signature. - Raises OlmAccountError on failure. - - Args: - message(str): The message to sign. - """ - bytes_message = to_bytearray(message) - out_length = lib.olm_account_signature_length(self._account) - out_buffer = ffi.new("char[]", out_length) - - try: - self._check_error( - lib.olm_account_sign(self._account, - ffi.from_buffer(bytes_message), - len(bytes_message), out_buffer, - out_length)) - finally: - # clear out copies of the message, which may be plaintext - if bytes_message is not message: - for i in range(0, len(bytes_message)): - bytes_message[i] = 0 - - return bytes_to_native_str(ffi.unpack(out_buffer, out_length)) - - @property - def max_one_time_keys(self): - # type: () -> int - """int: The maximum number of one-time keys the account can store.""" - return lib.olm_account_max_number_of_one_time_keys(self._account) - - def mark_keys_as_published(self): - # type: () -> None - """Mark the current set of one-time keys as being published.""" - lib.olm_account_mark_keys_as_published(self._account) - - def generate_one_time_keys(self, count): - # type: (int) -> None - """Generate a number of new one-time keys. - - If the total number of keys stored by this account exceeds - max_one_time_keys() then the old keys are discarded. - Raises OlmAccountError on error. - - Args: - count(int): The number of keys to generate. - """ - random_length = lib.olm_account_generate_one_time_keys_random_length( - self._account, count) - random = URANDOM(random_length) - - self._check_error( - lib.olm_account_generate_one_time_keys( - self._account, count, ffi.from_buffer(random), random_length)) - - @property - def one_time_keys(self): - # type: () -> Dict[str, Dict[str, str]] - """dict: The public part of the one-time keys for this account.""" - out_length = lib.olm_account_one_time_keys_length(self._account) - out_buffer = ffi.new("char[]", out_length) - - self._check_error( - lib.olm_account_one_time_keys(self._account, out_buffer, - out_length)) - - return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8")) - - def remove_one_time_keys(self, session): - # type: (Session) -> None - """Remove used one-time keys. - - Removes the one-time keys that the session used from the account. - Raises OlmAccountError on failure. If the account doesn't have any - matching one-time keys then the error message of the exception will be - "BAD_MESSAGE_KEY_ID". - - Args: - session(Session): An Olm Session object that was created with this - account. - """ - self._check_error(lib.olm_remove_one_time_keys(self._account, - session._session)) diff --git a/python/olm/group_session.py b/python/olm/group_session.py deleted file mode 100644 index 5068192..0000000 --- a/python/olm/group_session.py +++ /dev/null @@ -1,532 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Group session module. - -This module contains the group session part of the Olm library. It contains two -classes for creating inbound and outbound group sessions. - -Examples: - >>> outbound = OutboundGroupSession() - >>> InboundGroupSession(outbound.session_key) -""" - -# pylint: disable=redefined-builtin,unused-import -from builtins import bytes, super -from typing import AnyStr, Optional, Tuple, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str -from ._finalize import track_for_finalization - - -def _clear_inbound_group_session(session): - # type: (ffi.cdata) -> None - lib.olm_clear_inbound_group_session(session) - - -def _clear_outbound_group_session(session): - # type: (ffi.cdata) -> None - lib.olm_clear_outbound_group_session(session) - - -class OlmGroupSessionError(Exception): - """libolm Group session error exception.""" - - -class InboundGroupSession(object): - """Inbound group session for encrypted multiuser communication.""" - - def __new__( - cls, # type: Type[InboundGroupSession] - session_key=None # type: Optional[str] - ): - # type: (...) -> InboundGroupSession - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size()) - obj._session = lib.olm_inbound_group_session(obj._buf) - track_for_finalization(obj, obj._session, _clear_inbound_group_session) - return obj - - def __init__(self, session_key): - # type: (AnyStr) -> None - """Create a new inbound group session. - Start a new inbound group session, from a key exported from - an outbound group session. - - Raises OlmGroupSessionError on failure. The error message of the - exception will be "OLM_INVALID_BASE64" if the session key is not valid - base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid. - """ - if False: # pragma: no cover - self._session = self._session # type: ffi.cdata - - byte_session_key = to_bytearray(session_key) - - try: - ret = lib.olm_init_inbound_group_session( - self._session, - ffi.from_buffer(byte_session_key), len(byte_session_key) - ) - finally: - if byte_session_key is not session_key: - for i in range(0, len(byte_session_key)): - byte_session_key[i] = 0 - self._check_error(ret) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an inbound group session. - - Stores a group session as a base64 string. Encrypts the session using - the supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled session. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the session. - """ - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - - pickle_length = lib.olm_pickle_inbound_group_session_length( - self._session) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - ret = lib.olm_pickle_inbound_group_session( - self._session, - ffi.from_buffer(byte_passphrase), len(byte_passphrase), - pickle_buffer, pickle_length - ) - self._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> InboundGroupSession - """Load a previously stored inbound group session. - - Loads an inbound group session from a pickled base64 string and returns - an InboundGroupSession object. Decrypts the session using the supplied - passphrase. Raises OlmSessionError on failure. If the passphrase - doesn't match the one used to encrypt the session then the error - message for the exception will be "BAD_ACCOUNT_KEY". If the base64 - couldn't be decoded then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - session - passphrase(str, optional): The passphrase used to encrypt the - session - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - obj = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_inbound_group_session( - obj._session, - ffi.from_buffer(byte_passphrase), - len(byte_passphrase), - pickle_buffer, - len(pickle) - ) - obj._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return obj - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str(ffi.string( - lib.olm_inbound_group_session_last_error(self._session))) - - raise OlmGroupSessionError(last_error) - - def decrypt(self, ciphertext, unicode_errors="replace"): - # type: (AnyStr, str) -> Tuple[str, int] - """Decrypt a message - - Returns a tuple of the decrypted plain-text and the message index of - the decrypted message or raises OlmGroupSessionError on failure. - On failure the error message of the exception will be: - - * OLM_INVALID_BASE64 if the message is not valid base64 - * OLM_BAD_MESSAGE_VERSION if the message was encrypted with an - unsupported version of the protocol - * OLM_BAD_MESSAGE_FORMAT if the message headers could not be - decoded - * OLM_BAD_MESSAGE_MAC if the message could not be verified - * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key - corresponding to the message's index (i.e., it was sent before - the session key was shared with us) - - Args: - ciphertext(str): Base64 encoded ciphertext containing the encrypted - message - unicode_errors(str, optional): The error handling scheme to use for - unicode decoding errors. The default is "replace" meaning that - the character that was unable to decode will be replaced with - the unicode replacement character (U+FFFD). Other possible - values are "strict", "ignore" and "xmlcharrefreplace" as well - as any other name registered with codecs.register_error that - can handle UnicodeEncodeErrors. - """ - if not ciphertext: - raise ValueError("Ciphertext can't be empty.") - - byte_ciphertext = to_bytes(ciphertext) - - # copy because max_plaintext_length will destroy the buffer - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - - max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length( - self._session, ciphertext_buffer, len(byte_ciphertext) - ) - self._check_error(max_plaintext_length) - plaintext_buffer = ffi.new("char[]", max_plaintext_length) - # copy because max_plaintext_length will destroy the buffer - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - - message_index = ffi.new("uint32_t*") - plaintext_length = lib.olm_group_decrypt( - self._session, ciphertext_buffer, len(byte_ciphertext), - plaintext_buffer, max_plaintext_length, - message_index - ) - - self._check_error(plaintext_length) - - plaintext = to_unicode_str( - ffi.unpack(plaintext_buffer, plaintext_length), - errors=unicode_errors - ) - - # clear out copies of the plaintext - lib.memset(plaintext_buffer, 0, max_plaintext_length) - - return plaintext, message_index[0] - - @property - def id(self): - # type: () -> str - """str: A base64 encoded identifier for this session.""" - id_length = lib.olm_inbound_group_session_id_length(self._session) - id_buffer = ffi.new("char[]", id_length) - ret = lib.olm_inbound_group_session_id( - self._session, - id_buffer, - id_length - ) - self._check_error(ret) - return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) - - @property - def first_known_index(self): - # type: () -> int - """int: The first message index we know how to decrypt.""" - return lib.olm_inbound_group_session_first_known_index(self._session) - - def export_session(self, message_index): - # type: (int) -> str - """Export an inbound group session - - Export the base64-encoded ratchet key for this session, at the given - index, in a format which can be used by import_session(). - - Raises OlmGroupSessionError on failure. The error message for the - exception will be: - - * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key - corresponding to the given index (ie, it was sent before the - session key was shared with us) - - Args: - message_index(int): The message index at which the session should - be exported. - """ - - export_length = lib.olm_export_inbound_group_session_length( - self._session) - - export_buffer = ffi.new("char[]", export_length) - ret = lib.olm_export_inbound_group_session( - self._session, - export_buffer, - export_length, - message_index - ) - self._check_error(ret) - export_str = bytes_to_native_str(ffi.unpack(export_buffer, export_length)) - - # clear out copies of the key - lib.memset(export_buffer, 0, export_length) - - return export_str - - @classmethod - def import_session(cls, session_key): - # type: (AnyStr) -> InboundGroupSession - """Create an InboundGroupSession from an exported session key. - - Creates an InboundGroupSession with an previously exported session key, - raises OlmGroupSessionError on failure. The error message for the - exception will be: - - * OLM_INVALID_BASE64 if the session_key is not valid base64 - * OLM_BAD_SESSION_KEY if the session_key is invalid - - Args: - session_key(str): The exported session key with which the inbound - group session will be created - """ - obj = cls.__new__(cls) - - byte_session_key = to_bytearray(session_key) - - try: - ret = lib.olm_import_inbound_group_session( - obj._session, - ffi.from_buffer(byte_session_key), - len(byte_session_key) - ) - obj._check_error(ret) - finally: - # clear out copies of the key - if byte_session_key is not session_key: - for i in range(0, len(byte_session_key)): - byte_session_key[i] = 0 - - return obj - - -class OutboundGroupSession(object): - """Outbound group session for encrypted multiuser communication.""" - - def __new__(cls): - # type: (Type[OutboundGroupSession]) -> OutboundGroupSession - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size()) - obj._session = lib.olm_outbound_group_session(obj._buf) - track_for_finalization( - obj, - obj._session, - _clear_outbound_group_session - ) - return obj - - def __init__(self): - # type: () -> None - """Create a new outbound group session. - - Start a new outbound group session. Raises OlmGroupSessionError on - failure. - """ - if False: # pragma: no cover - self._session = self._session # type: ffi.cdata - - random_length = lib.olm_init_outbound_group_session_random_length( - self._session - ) - random = URANDOM(random_length) - - ret = lib.olm_init_outbound_group_session( - self._session, ffi.from_buffer(random), random_length - ) - self._check_error(ret) - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str(ffi.string( - lib.olm_outbound_group_session_last_error(self._session) - )) - - raise OlmGroupSessionError(last_error) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an outbound group session. - - Stores a group session as a base64 string. Encrypts the session using - the supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled session. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the session. - """ - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - pickle_length = lib.olm_pickle_outbound_group_session_length( - self._session) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - ret = lib.olm_pickle_outbound_group_session( - self._session, - ffi.from_buffer(byte_passphrase), len(byte_passphrase), - pickle_buffer, pickle_length - ) - self._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> OutboundGroupSession - """Load a previously stored outbound group session. - - Loads an outbound group session from a pickled base64 string and - returns an OutboundGroupSession object. Decrypts the session using the - supplied passphrase. Raises OlmSessionError on failure. If the - passphrase doesn't match the one used to encrypt the session then the - error message for the exception will be "BAD_ACCOUNT_KEY". If the - base64 couldn't be decoded then the error message will be - "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - session - passphrase(str, optional): The passphrase used to encrypt the - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - obj = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_outbound_group_session( - obj._session, - ffi.from_buffer(byte_passphrase), - len(byte_passphrase), - pickle_buffer, - len(pickle) - ) - obj._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return obj - - def encrypt(self, plaintext): - # type: (AnyStr) -> str - """Encrypt a message. - - Returns the encrypted ciphertext. - - Args: - plaintext(str): A string that will be encrypted using the group - session. - """ - byte_plaintext = to_bytearray(plaintext) - message_length = lib.olm_group_encrypt_message_length( - self._session, len(byte_plaintext) - ) - - message_buffer = ffi.new("char[]", message_length) - - try: - ret = lib.olm_group_encrypt( - self._session, - ffi.from_buffer(byte_plaintext), len(byte_plaintext), - message_buffer, message_length, - ) - self._check_error(ret) - finally: - # clear out copies of plaintext - if byte_plaintext is not plaintext: - for i in range(0, len(byte_plaintext)): - byte_plaintext[i] = 0 - - return bytes_to_native_str(ffi.unpack(message_buffer, message_length)) - - @property - def id(self): - # type: () -> str - """str: A base64 encoded identifier for this session.""" - id_length = lib.olm_outbound_group_session_id_length(self._session) - id_buffer = ffi.new("char[]", id_length) - - ret = lib.olm_outbound_group_session_id( - self._session, - id_buffer, - id_length - ) - self._check_error(ret) - - return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) - - @property - def message_index(self): - # type: () -> int - """int: The current message index of the session. - - Each message is encrypted with an increasing index. This is the index - for the next message. - """ - return lib.olm_outbound_group_session_message_index(self._session) - - @property - def session_key(self): - # type: () -> str - """The base64-encoded current ratchet key for this session. - - Each message is encrypted with a different ratchet key. This function - returns the ratchet key that will be used for the next message. - """ - key_length = lib.olm_outbound_group_session_key_length(self._session) - key_buffer = ffi.new("char[]", key_length) - - ret = lib.olm_outbound_group_session_key( - self._session, - key_buffer, - key_length - ) - self._check_error(ret) - - return bytes_to_native_str(ffi.unpack(key_buffer, key_length)) diff --git a/python/olm/pk.py b/python/olm/pk.py deleted file mode 100644 index 4352359..0000000 --- a/python/olm/pk.py +++ /dev/null @@ -1,461 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm PK module. - -This module contains bindings to the PK part of the Olm library. -It contains two classes PkDecryption and PkEncryption that are used to -establish an encrypted communication channel using public key encryption, -as well as a class PkSigning that is used to sign a message. - -Examples: - >>> decryption = PkDecryption() - >>> encryption = PkEncryption(decryption.public_key) - >>> plaintext = "It's a secret to everybody." - >>> message = encryption.encrypt(plaintext) - >>> decrypted_plaintext = decryption.decrypt(message) - >>> seed = PkSigning.generate_seed() - >>> signing = PkSigning(seed) - >>> signature = signing.sign(plaintext) - >>> ed25519_verify(signing.public_key, plaintext, signature) - -""" - -from builtins import super -from typing import AnyStr, Type - -from future.utils import bytes_to_native_str - -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray, to_unicode_str -from ._finalize import track_for_finalization - - -class PkEncryptionError(Exception): - """libolm Pk encryption exception.""" - - -class PkDecryptionError(Exception): - """libolm Pk decryption exception.""" - - -class PkSigningError(Exception): - """libolm Pk signing exception.""" - - -def _clear_pk_encryption(pk_struct): - lib.olm_clear_pk_encryption(pk_struct) - - -class PkMessage(object): - """A PK encrypted message.""" - - def __init__(self, ephemeral_key, mac, ciphertext): - # type: (str, str, str) -> None - """Create a new PK encrypted message. - - Args: - ephemeral_key(str): the public part of the ephemeral key - used (together with the recipient's key) to generate a symmetric - encryption key. - mac(str): Message Authentication Code of the encrypted message - ciphertext(str): The cipher text of the encrypted message - """ - self.ephemeral_key = ephemeral_key - self.mac = mac - self.ciphertext = ciphertext - - -class PkEncryption(object): - """PkEncryption class. - - Represents the decryption part of a PK encrypted channel. - """ - - def __init__(self, recipient_key): - # type: (AnyStr) -> None - """Create a new PK encryption object. - - Args: - recipient_key(str): a public key that will be used for encryption - """ - if not recipient_key: - raise ValueError("Recipient key can't be empty") - - self._buf = ffi.new("char[]", lib.olm_pk_encryption_size()) - self._pk_encryption = lib.olm_pk_encryption(self._buf) - track_for_finalization(self, self._pk_encryption, _clear_pk_encryption) - - byte_key = to_bytearray(recipient_key) - lib.olm_pk_encryption_set_recipient_key( - self._pk_encryption, - ffi.from_buffer(byte_key), - len(byte_key) - ) - - # clear out copies of the key - if byte_key is not recipient_key: # pragma: no cover - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - def _check_error(self, ret): # pragma: no cover - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string(lib.olm_pk_encryption_last_error(self._pk_encryption))) - - raise PkEncryptionError(last_error) - - def encrypt(self, plaintext): - # type: (AnyStr) -> PkMessage - """Encrypt a message. - - Returns the encrypted PkMessage. - - Args: - plaintext(str): A string that will be encrypted using the - PkEncryption object. - """ - byte_plaintext = to_bytearray(plaintext) - - r_length = lib.olm_pk_encrypt_random_length(self._pk_encryption) - random = URANDOM(r_length) - random_buffer = ffi.new("char[]", random) - - ciphertext_length = lib.olm_pk_ciphertext_length( - self._pk_encryption, len(byte_plaintext) - ) - ciphertext = ffi.new("char[]", ciphertext_length) - - mac_length = lib.olm_pk_mac_length(self._pk_encryption) - mac = ffi.new("char[]", mac_length) - - ephemeral_key_size = lib.olm_pk_key_length() - ephemeral_key = ffi.new("char[]", ephemeral_key_size) - - ret = lib.olm_pk_encrypt( - self._pk_encryption, - ffi.from_buffer(byte_plaintext), len(byte_plaintext), - ciphertext, ciphertext_length, - mac, mac_length, - ephemeral_key, ephemeral_key_size, - random_buffer, r_length - ) - - try: - self._check_error(ret) - finally: # pragma: no cover - # clear out copies of plaintext - if byte_plaintext is not plaintext: - for i in range(0, len(byte_plaintext)): - byte_plaintext[i] = 0 - - message = PkMessage( - bytes_to_native_str( - ffi.unpack(ephemeral_key, ephemeral_key_size)), - bytes_to_native_str( - ffi.unpack(mac, mac_length)), - bytes_to_native_str( - ffi.unpack(ciphertext, ciphertext_length)) - ) - return message - - -def _clear_pk_decryption(pk_struct): - lib.olm_clear_pk_decryption(pk_struct) - - -class PkDecryption(object): - """PkDecryption class. - - Represents the decryption part of a PK encrypted channel. - - Attributes: - public_key (str): The public key of the PkDecryption object, can be - shared and used to create a PkEncryption object. - - """ - - def __new__(cls): - # type: (Type[PkDecryption]) -> PkDecryption - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_pk_decryption_size()) - obj._pk_decryption = lib.olm_pk_decryption(obj._buf) - obj.public_key = None - track_for_finalization(obj, obj._pk_decryption, _clear_pk_decryption) - return obj - - def __init__(self): - if False: # pragma: no cover - self._pk_decryption = self._pk_decryption # type: ffi.cdata - - random_length = lib.olm_pk_private_key_length() - random = URANDOM(random_length) - random_buffer = ffi.new("char[]", random) - - key_length = lib.olm_pk_key_length() - key_buffer = ffi.new("char[]", key_length) - - ret = lib.olm_pk_key_from_private( - self._pk_decryption, - key_buffer, key_length, - random_buffer, random_length - ) - self._check_error(ret) - self.public_key = bytes_to_native_str(ffi.unpack( - key_buffer, - key_length - )) - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string(lib.olm_pk_decryption_last_error(self._pk_decryption))) - - raise PkDecryptionError(last_error) - - def pickle(self, passphrase=""): - # type: (str) -> bytes - """Store a PkDecryption object. - - Stores a PkDecryption object as a base64 string. Encrypts the object - using the supplied passphrase. Returns a byte object containing the - base64 encoded string of the pickled session. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the object. - """ - byte_key = to_bytearray(passphrase) - - pickle_length = lib.olm_pickle_pk_decryption_length( - self._pk_decryption - ) - pickle_buffer = ffi.new("char[]", pickle_length) - - ret = lib.olm_pickle_pk_decryption( - self._pk_decryption, - ffi.from_buffer(byte_key), len(byte_key), - pickle_buffer, pickle_length - ) - try: - self._check_error(ret) - finally: - # zero out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # types: (bytes, str) -> PkDecryption - """Restore a previously stored PkDecryption object. - - Creates a PkDecryption object from a pickled base64 string. Decrypts - the pickled object using the supplied passphrase. - Raises PkDecryptionError on failure. If the passphrase - doesn't match the one used to encrypt the session then the error - message for the exception will be "BAD_ACCOUNT_KEY". If the base64 - couldn't be decoded then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - PkDecryption object - passphrase(str, optional): The passphrase used to encrypt the - object - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_key = to_bytearray(passphrase) - pickle_buffer = ffi.new("char[]", pickle) - - pubkey_length = lib.olm_pk_key_length() - pubkey_buffer = ffi.new("char[]", pubkey_length) - - obj = cls.__new__(cls) - - ret = lib.olm_unpickle_pk_decryption( - obj._pk_decryption, - ffi.from_buffer(byte_key), len(byte_key), - pickle_buffer, len(pickle), - pubkey_buffer, pubkey_length) - - try: - obj._check_error(ret) - finally: - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - obj.public_key = bytes_to_native_str(ffi.unpack( - pubkey_buffer, - pubkey_length - )) - - return obj - - def decrypt(self, message, unicode_errors="replace"): - # type (PkMessage, str) -> str - """Decrypt a previously encrypted Pk message. - - Returns the decrypted plaintext. - Raises PkDecryptionError on failure. - - Args: - message(PkMessage): the pk message to decrypt. - unicode_errors(str, optional): The error handling scheme to use for - unicode decoding errors. The default is "replace" meaning that - the character that was unable to decode will be replaced with - the unicode replacement character (U+FFFD). Other possible - values are "strict", "ignore" and "xmlcharrefreplace" as well - as any other name registered with codecs.register_error that - can handle UnicodeEncodeErrors. - """ - ephemeral_key = to_bytearray(message.ephemeral_key) - ephemeral_key_size = len(ephemeral_key) - - mac = to_bytearray(message.mac) - mac_length = len(mac) - - ciphertext = to_bytearray(message.ciphertext) - ciphertext_length = len(ciphertext) - - max_plaintext_length = lib.olm_pk_max_plaintext_length( - self._pk_decryption, - ciphertext_length - ) - plaintext_buffer = ffi.new("char[]", max_plaintext_length) - - ret = lib.olm_pk_decrypt( - self._pk_decryption, - ffi.from_buffer(ephemeral_key), ephemeral_key_size, - ffi.from_buffer(mac), mac_length, - ffi.from_buffer(ciphertext), ciphertext_length, - plaintext_buffer, max_plaintext_length) - self._check_error(ret) - - plaintext = (ffi.unpack( - plaintext_buffer, - ret - )) - - # clear out copies of the plaintext - lib.memset(plaintext_buffer, 0, max_plaintext_length) - - return to_unicode_str(plaintext, errors=unicode_errors) - - -def _clear_pk_signing(pk_struct): - lib.olm_clear_pk_signing(pk_struct) - - -class PkSigning(object): - """PkSigning class. - - Signs messages using public key cryptography. - - Attributes: - public_key (str): The public key of the PkSigning object, can be - shared and used to verify using Utility.ed25519_verify. - - """ - - def __init__(self, seed): - # type: (bytes) -> None - """Create a new signing object. - - Args: - seed(bytes): the seed to use as the private key for signing. The - seed must have the same length as the seeds generated by - PkSigning.generate_seed(). - """ - if not seed: - raise ValueError("seed can't be empty") - - self._buf = ffi.new("char[]", lib.olm_pk_signing_size()) - self._pk_signing = lib.olm_pk_signing(self._buf) - track_for_finalization(self, self._pk_signing, _clear_pk_signing) - - seed_buffer = ffi.new("char[]", seed) - - pubkey_length = lib.olm_pk_signing_public_key_length() - pubkey_buffer = ffi.new("char[]", pubkey_length) - - ret = lib.olm_pk_signing_key_from_seed( - self._pk_signing, - pubkey_buffer, pubkey_length, - seed_buffer, len(seed) - ) - - # zero out copies of the seed - lib.memset(seed_buffer, 0, len(seed)) - - self._check_error(ret) - - self.public_key = bytes_to_native_str( - ffi.unpack(pubkey_buffer, pubkey_length) - ) - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string(lib.olm_pk_signing_last_error(self._pk_signing))) - - raise PkSigningError(last_error) - - @classmethod - def generate_seed(cls): - # type: () -> bytes - """Generate a random seed. - """ - random_length = lib.olm_pk_signing_seed_length() - random = URANDOM(random_length) - - return random - - def sign(self, message): - # type: (AnyStr) -> str - """Sign a message - - Returns the signature. - Raises PkSigningError on failure. - - Args: - message(str): the message to sign. - """ - bytes_message = to_bytearray(message) - - signature_length = lib.olm_pk_signature_length() - signature_buffer = ffi.new("char[]", signature_length) - - ret = lib.olm_pk_sign( - self._pk_signing, - ffi.from_buffer(bytes_message), len(bytes_message), - signature_buffer, signature_length) - self._check_error(ret) - - return bytes_to_native_str( - ffi.unpack(signature_buffer, signature_length) - ) diff --git a/python/olm/sas.py b/python/olm/sas.py deleted file mode 100644 index cf2a443..0000000 --- a/python/olm/sas.py +++ /dev/null @@ -1,245 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2019 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""libolm SAS module. - -This module contains functions to perform key verification using the Short -Authentication String (SAS) method. - -Examples: - >>> sas = Sas() - >>> bob_key = "3p7bfXt9wbTTW2HC7OQ1Nz+DQ8hbeGdNrfx+FG+IK08" - >>> message = "Hello world!" - >>> extra_info = "MAC" - >>> sas_alice.set_their_pubkey(bob_key) - >>> sas_alice.calculate_mac(message, extra_info) - >>> sas_alice.generate_bytes(extra_info, 5) - -""" - -from builtins import bytes -from functools import wraps -from typing import Optional - -from future.utils import bytes_to_native_str - -from _libolm import ffi, lib - -from ._compat import URANDOM, to_bytearray, to_bytes -from ._finalize import track_for_finalization - - -def _clear_sas(sas): - # type: (ffi.cdata) -> None - lib.olm_clear_sas(sas) - - -class OlmSasError(Exception): - """libolm Sas error exception.""" - - -class Sas(object): - """libolm Short Authenticaton String (SAS) class.""" - - def __init__(self, other_users_pubkey=None): - # type: (Optional[str]) -> None - """Create a new SAS object. - - Args: - other_users_pubkey(str, optional): The other users public key, this - key is necesary to generate bytes for the authentication string - as well as to calculate the MAC. - - Raises OlmSasError on failure. - - """ - self._buf = ffi.new("char[]", lib.olm_sas_size()) - self._sas = lib.olm_sas(self._buf) - track_for_finalization(self, self._sas, _clear_sas) - - random_length = lib.olm_create_sas_random_length(self._sas) - random = URANDOM(random_length) - - self._create_sas(random, random_length) - - if other_users_pubkey: - self.set_their_pubkey(other_users_pubkey) - - def _create_sas(self, buffer, buffer_length): - self._check_error( - lib.olm_create_sas( - self._sas, - ffi.from_buffer(buffer), - buffer_length - ) - ) - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string((lib.olm_sas_last_error(self._sas)))) - - raise OlmSasError(last_error) - - @property - def pubkey(self): - # type: () -> str - """Get the public key for the SAS object. - - This returns the public key of the SAS object that can then be shared - with another user to perform the authentication process. - - Raises OlmSasError on failure. - - """ - pubkey_length = lib.olm_sas_pubkey_length(self._sas) - pubkey_buffer = ffi.new("char[]", pubkey_length) - - self._check_error( - lib.olm_sas_get_pubkey(self._sas, pubkey_buffer, pubkey_length) - ) - - return bytes_to_native_str(ffi.unpack(pubkey_buffer, pubkey_length)) - - @property - def other_key_set(self): - # type: () -> bool - """Check if the other user's pubkey has been set. - """ - return lib.olm_sas_is_their_key_set(self._sas) == 1 - - def set_their_pubkey(self, key): - # type: (str) -> None - """Set the public key of the other user. - - This sets the public key of the other user, it needs to be set before - bytes can be generated for the authentication string and a MAC can be - calculated. - - Args: - key (str): The other users public key. - - Raises OlmSasError on failure. - - """ - byte_key = to_bytearray(key) - - self._check_error( - lib.olm_sas_set_their_key( - self._sas, - ffi.from_buffer(byte_key), - len(byte_key) - ) - ) - - def generate_bytes(self, extra_info, length): - # type: (str, int) -> bytes - """Generate bytes to use for the short authentication string. - - Args: - extra_info (str): Extra information to mix in when generating the - bytes. - length (int): The number of bytes to generate. - - Raises OlmSasError if the other users persons public key isn't set or - an internal Olm error happens. - - """ - if length < 1: - raise ValueError("The length needs to be a positive integer value") - - byte_info = to_bytearray(extra_info) - out_buffer = ffi.new("char[]", length) - - self._check_error( - lib.olm_sas_generate_bytes( - self._sas, - ffi.from_buffer(byte_info), - len(byte_info), - out_buffer, - length - ) - ) - - return ffi.unpack(out_buffer, length) - - def calculate_mac(self, message, extra_info): - # type: (str, str) -> str - """Generate a message authentication code based on the shared secret. - - Args: - message (str): The message to produce the authentication code for. - extra_info (str): Extra information to mix in when generating the - MAC - - Raises OlmSasError on failure. - - """ - byte_message = to_bytes(message) - byte_info = to_bytes(extra_info) - - mac_length = lib.olm_sas_mac_length(self._sas) - mac_buffer = ffi.new("char[]", mac_length) - - self._check_error( - lib.olm_sas_calculate_mac( - self._sas, - ffi.from_buffer(byte_message), - len(byte_message), - ffi.from_buffer(byte_info), - len(byte_info), - mac_buffer, - mac_length - ) - ) - return bytes_to_native_str(ffi.unpack(mac_buffer, mac_length)) - - def calculate_mac_long_kdf(self, message, extra_info): - # type: (str, str) -> str - """Generate a message authentication code based on the shared secret. - - This function should not be used unless compatibility with an older - non-tagged Olm version is required. - - Args: - message (str): The message to produce the authentication code for. - extra_info (str): Extra information to mix in when generating the - MAC - - Raises OlmSasError on failure. - - """ - byte_message = to_bytes(message) - byte_info = to_bytes(extra_info) - - mac_length = lib.olm_sas_mac_length(self._sas) - mac_buffer = ffi.new("char[]", mac_length) - - self._check_error( - lib.olm_sas_calculate_mac_long_kdf( - self._sas, - ffi.from_buffer(byte_message), - len(byte_message), - ffi.from_buffer(byte_info), - len(byte_info), - mac_buffer, - mac_length - ) - ) - return bytes_to_native_str(ffi.unpack(mac_buffer, mac_length)) diff --git a/python/olm/session.py b/python/olm/session.py deleted file mode 100644 index 636eb3d..0000000 --- a/python/olm/session.py +++ /dev/null @@ -1,495 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Session module. - -This module contains the Olm Session part of the Olm library. - -It is used to establish a peer-to-peer encrypted communication channel between -two Olm accounts. - -Examples: - >>> alice = Account() - >>> bob = Account() - >>> bob.generate_one_time_keys(1) - >>> id_key = bob.identity_keys['curve25519'] - >>> one_time = list(bob.one_time_keys["curve25519"].values())[0] - >>> session = OutboundSession(alice, id_key, one_time) - -""" - -# pylint: disable=redefined-builtin,unused-import -from builtins import bytes, super -from typing import AnyStr, Optional, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str -from ._finalize import track_for_finalization - -# This is imported only for type checking purposes -if False: - from .account import Account # pragma: no cover - - -class OlmSessionError(Exception): - """libolm Session exception.""" - - -class _OlmMessage(object): - def __init__(self, ciphertext, message_type): - # type: (AnyStr, ffi.cdata) -> None - if not ciphertext: - raise ValueError("Ciphertext can't be empty") - - # I don't know why mypy wants a type annotation here nor why AnyStr - # doesn't work - self.ciphertext = ciphertext # type: ignore - self.message_type = message_type - - def __str__(self): - # type: () -> str - type_to_prefix = { - lib.OLM_MESSAGE_TYPE_PRE_KEY: "PRE_KEY", - lib.OLM_MESSAGE_TYPE_MESSAGE: "MESSAGE" - } - - prefix = type_to_prefix[self.message_type] - return "{} {}".format(prefix, self.ciphertext) - - -class OlmPreKeyMessage(_OlmMessage): - """Olm prekey message class - - Prekey messages are used to establish an Olm session. After the first - message exchange the session switches to normal messages - """ - - def __init__(self, ciphertext): - # type: (AnyStr) -> None - """Create a new Olm prekey message with the supplied ciphertext - - Args: - ciphertext(str): The ciphertext of the prekey message. - """ - _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_PRE_KEY) - - def __repr__(self): - # type: () -> str - return "OlmPreKeyMessage({})".format(self.ciphertext) - - -class OlmMessage(_OlmMessage): - """Olm message class""" - - def __init__(self, ciphertext): - # type: (AnyStr) -> None - """Create a new Olm message with the supplied ciphertext - - Args: - ciphertext(str): The ciphertext of the message. - """ - _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_MESSAGE) - - def __repr__(self): - # type: () -> str - return "OlmMessage({})".format(self.ciphertext) - - -def _clear_session(session): - # type: (ffi.cdata) -> None - lib.olm_clear_session(session) - - -class Session(object): - """libolm Session class. - This is an abstract class that can't be instantiated except when unpickling - a previously pickled InboundSession or OutboundSession object with - from_pickle. - """ - - def __new__(cls): - # type: (Type[Session]) -> Session - - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_session_size()) - obj._session = lib.olm_session(obj._buf) - track_for_finalization(obj, obj._session, _clear_session) - return obj - - def __init__(self): - # type: () -> None - if type(self) is Session: - raise TypeError("Session class may not be instantiated.") - - if False: - self._session = self._session # type: ffi.cdata - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string(lib.olm_session_last_error(self._session))) - - raise OlmSessionError(last_error) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an Olm session. - - Stores a session as a base64 string. Encrypts the session using the - supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled session. Raises OlmSessionError on - failure. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the session. - """ - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - - pickle_length = lib.olm_pickle_session_length(self._session) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - self._check_error( - lib.olm_pickle_session(self._session, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, pickle_length)) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> Session - """Load a previously stored Olm session. - - Loads a session from a pickled base64 string and returns a Session - object. Decrypts the session using the supplied passphrase. Raises - OlmSessionError on failure. If the passphrase doesn't match the one - used to encrypt the session then the error message for the - exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded - then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - session - passphrase(str, optional): The passphrase used to encrypt the - session. - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - session = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_session(session._session, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, - len(pickle)) - session._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return session - - def encrypt(self, plaintext): - # type: (AnyStr) -> _OlmMessage - """Encrypts a message using the session. Returns the ciphertext as a - base64 encoded string on success. Raises OlmSessionError on failure. - - Args: - plaintext(str): The plaintext message that will be encrypted. - """ - byte_plaintext = to_bytearray(plaintext) - - r_length = lib.olm_encrypt_random_length(self._session) - random = URANDOM(r_length) - - try: - message_type = lib.olm_encrypt_message_type(self._session) - - self._check_error(message_type) - - ciphertext_length = lib.olm_encrypt_message_length( - self._session, len(byte_plaintext) - ) - ciphertext_buffer = ffi.new("char[]", ciphertext_length) - - self._check_error(lib.olm_encrypt( - self._session, - ffi.from_buffer(byte_plaintext), len(byte_plaintext), - ffi.from_buffer(random), r_length, - ciphertext_buffer, ciphertext_length, - )) - finally: - # clear out copies of plaintext - if byte_plaintext is not plaintext: - for i in range(0, len(byte_plaintext)): - byte_plaintext[i] = 0 - - if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY: - return OlmPreKeyMessage( - bytes_to_native_str(ffi.unpack( - ciphertext_buffer, - ciphertext_length - ))) - elif message_type == lib.OLM_MESSAGE_TYPE_MESSAGE: - return OlmMessage( - bytes_to_native_str(ffi.unpack( - ciphertext_buffer, - ciphertext_length - ))) - else: # pragma: no cover - raise ValueError("Unknown message type") - - def decrypt(self, message, unicode_errors="replace"): - # type: (_OlmMessage, str) -> str - """Decrypts a message using the session. Returns the plaintext string - on success. Raises OlmSessionError on failure. If the base64 couldn't - be decoded then the error message will be "INVALID_BASE64". If the - message is for an unsupported version of the protocol the error message - will be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then - the error message will be "BAD_MESSAGE_FORMAT". If the MAC on the - message was invalid then the error message will be "BAD_MESSAGE_MAC". - - Args: - message(OlmMessage): The Olm message that will be decrypted. It can - be either an OlmPreKeyMessage or an OlmMessage. - unicode_errors(str, optional): The error handling scheme to use for - unicode decoding errors. The default is "replace" meaning that - the character that was unable to decode will be replaced with - the unicode replacement character (U+FFFD). Other possible - values are "strict", "ignore" and "xmlcharrefreplace" as well - as any other name registered with codecs.register_error that - can handle UnicodeEncodeErrors. - """ - if not message.ciphertext: - raise ValueError("Ciphertext can't be empty") - - byte_ciphertext = to_bytes(message.ciphertext) - # make a copy the ciphertext buffer, because - # olm_decrypt_max_plaintext_length wants to destroy something - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - - max_plaintext_length = lib.olm_decrypt_max_plaintext_length( - self._session, message.message_type, ciphertext_buffer, - len(byte_ciphertext) - ) - self._check_error(max_plaintext_length) - plaintext_buffer = ffi.new("char[]", max_plaintext_length) - - # make a copy the ciphertext buffer, because - # olm_decrypt_max_plaintext_length wants to destroy something - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - plaintext_length = lib.olm_decrypt( - self._session, message.message_type, - ciphertext_buffer, len(byte_ciphertext), - plaintext_buffer, max_plaintext_length - ) - self._check_error(plaintext_length) - plaintext = to_unicode_str( - ffi.unpack(plaintext_buffer, plaintext_length), - errors=unicode_errors - ) - - # clear out copies of the plaintext - lib.memset(plaintext_buffer, 0, max_plaintext_length) - - return plaintext - - @property - def id(self): - # type: () -> str - """str: An identifier for this session. Will be the same for both - ends of the conversation. - """ - id_length = lib.olm_session_id_length(self._session) - id_buffer = ffi.new("char[]", id_length) - - self._check_error( - lib.olm_session_id(self._session, id_buffer, id_length) - ) - return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) - - def matches(self, message, identity_key=None): - # type: (OlmPreKeyMessage, Optional[AnyStr]) -> bool - """Checks if the PRE_KEY message is for this in-bound session. - This can happen if multiple messages are sent to this session before - this session sends a message in reply. Returns True if the session - matches. Returns False if the session does not match. Raises - OlmSessionError on failure. If the base64 couldn't be decoded then the - error message will be "INVALID_BASE64". If the message was for an - unsupported protocol version then the error message will be - "BAD_MESSAGE_VERSION". If the message couldn't be decoded then then the - error message will be * "BAD_MESSAGE_FORMAT". - - Args: - message(OlmPreKeyMessage): The Olm prekey message that will checked - if it is intended for this session. - identity_key(str, optional): The identity key of the sender. To - check if the message was also sent using this identity key. - """ - if not isinstance(message, OlmPreKeyMessage): - raise TypeError("Matches can only be called with prekey messages.") - - if not message.ciphertext: - raise ValueError("Ciphertext can't be empty") - - ret = None - - byte_ciphertext = to_bytes(message.ciphertext) - # make a copy, because olm_matches_inbound_session(_from) will distroy - # it - message_buffer = ffi.new("char[]", byte_ciphertext) - - if identity_key: - byte_id_key = to_bytes(identity_key) - - ret = lib.olm_matches_inbound_session_from( - self._session, - ffi.from_buffer(byte_id_key), len(byte_id_key), - message_buffer, len(byte_ciphertext) - ) - - else: - ret = lib.olm_matches_inbound_session( - self._session, - message_buffer, len(byte_ciphertext)) - - self._check_error(ret) - - return bool(ret) - - -class InboundSession(Session): - """Inbound Olm session for p2p encrypted communication. - """ - - def __new__(cls, account, message, identity_key=None): - # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> Session - return super().__new__(cls) - - def __init__(self, account, message, identity_key=None): - # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> None - """Create a new inbound Olm session. - - Create a new in-bound session for sending/receiving messages from an - incoming prekey message. Raises OlmSessionError on failure. If the - base64 couldn't be decoded then error message will be "INVALID_BASE64". - If the message was for an unsupported protocol version then - the errror message will be "BAD_MESSAGE_VERSION". If the message - couldn't be decoded then then the error message will be - "BAD_MESSAGE_FORMAT". If the message refers to an unknown one-time - key then the error message will be "BAD_MESSAGE_KEY_ID". - - Args: - account(Account): The Olm Account that will be used to create this - session. - message(OlmPreKeyMessage): The Olm prekey message that will checked - that will be used to create this session. - identity_key(str, optional): The identity key of the sender. To - check if the message was also sent using this identity key. - """ - if not message.ciphertext: - raise ValueError("Ciphertext can't be empty") - - super().__init__() - byte_ciphertext = to_bytes(message.ciphertext) - message_buffer = ffi.new("char[]", byte_ciphertext) - - if identity_key: - byte_id_key = to_bytes(identity_key) - identity_key_buffer = ffi.new("char[]", byte_id_key) - self._check_error(lib.olm_create_inbound_session_from( - self._session, - account._account, - identity_key_buffer, len(byte_id_key), - message_buffer, len(byte_ciphertext) - )) - else: - self._check_error(lib.olm_create_inbound_session( - self._session, - account._account, - message_buffer, len(byte_ciphertext) - )) - - -class OutboundSession(Session): - """Outbound Olm session for p2p encrypted communication.""" - - def __new__(cls, account, identity_key, one_time_key): - # type: (Account, AnyStr, AnyStr) -> Session - return super().__new__(cls) - - def __init__(self, account, identity_key, one_time_key): - # type: (Account, AnyStr, AnyStr) -> None - """Create a new outbound Olm session. - - Creates a new outbound session for sending messages to a given - identity key and one-time key. - - Raises OlmSessionError on failure. If the keys couldn't be decoded as - base64 then the error message will be "INVALID_BASE64". - - Args: - account(Account): The Olm Account that will be used to create this - session. - identity_key(str): The identity key of the person with whom we want - to start the session. - one_time_key(str): A one-time key from the person with whom we want - to start the session. - """ - if not identity_key: - raise ValueError("Identity key can't be empty") - - if not one_time_key: - raise ValueError("One-time key can't be empty") - - super().__init__() - - byte_id_key = to_bytes(identity_key) - byte_one_time = to_bytes(one_time_key) - - session_random_length = lib.olm_create_outbound_session_random_length( - self._session) - - random = URANDOM(session_random_length) - - self._check_error(lib.olm_create_outbound_session( - self._session, - account._account, - ffi.from_buffer(byte_id_key), len(byte_id_key), - ffi.from_buffer(byte_one_time), len(byte_one_time), - ffi.from_buffer(random), session_random_length - )) diff --git a/python/olm/utility.py b/python/olm/utility.py deleted file mode 100644 index bddef38..0000000 --- a/python/olm/utility.py +++ /dev/null @@ -1,151 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Utility module. - -This module contains utilities for olm. -It only contains the ed25519_verify function for signature verification. - -Examples: - >>> alice = Account() - - >>> message = "Test" - >>> signature = alice.sign(message) - >>> signing_key = alice.identity_keys["ed25519"] - - >>> ed25519_verify(signing_key, message, signature) - -""" - -# pylint: disable=redefined-builtin,unused-import -from typing import AnyStr, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import to_bytearray, to_bytes -from ._finalize import track_for_finalization - - -def _clear_utility(utility): # pragma: no cover - # type: (ffi.cdata) -> None - lib.olm_clear_utility(utility) - - -class OlmVerifyError(Exception): - """libolm signature verification exception.""" - - -class OlmHashError(Exception): - """libolm hash calculation exception.""" - - -class _Utility(object): - # pylint: disable=too-few-public-methods - """libolm Utility class.""" - - _buf = None - _utility = None - - @classmethod - def _allocate(cls): - # type: (Type[_Utility]) -> None - cls._buf = ffi.new("char[]", lib.olm_utility_size()) - cls._utility = lib.olm_utility(cls._buf) - track_for_finalization(cls, cls._utility, _clear_utility) - - @classmethod - def _check_error(cls, ret, error_class): - # type: (int, Type) -> None - if ret != lib.olm_error(): - return - - raise error_class("{}".format( - ffi.string(lib.olm_utility_last_error( - cls._utility)).decode("utf-8"))) - - @classmethod - def _ed25519_verify(cls, key, message, signature): - # type: (Type[_Utility], AnyStr, AnyStr, AnyStr) -> None - if not cls._utility: - cls._allocate() - - byte_key = to_bytes(key) - byte_message = to_bytearray(message) - byte_signature = to_bytearray(signature) - - try: - ret = lib.olm_ed25519_verify( - cls._utility, - byte_key, - len(byte_key), - ffi.from_buffer(byte_message), - len(byte_message), - ffi.from_buffer(byte_signature), - len(byte_signature) - ) - - cls._check_error(ret, OlmVerifyError) - - finally: - # clear out copies of the message, which may be a plaintext - if byte_message is not message: - for i in range(0, len(byte_message)): - byte_message[i] = 0 - - @classmethod - def _sha256(cls, input): - # type: (Type[_Utility], AnyStr) -> str - if not cls._utility: - cls._allocate() - - byte_input = to_bytes(input) - hash_length = lib.olm_sha256_length(cls._utility) - hash = ffi.new("char[]", hash_length) - - ret = lib.olm_sha256(cls._utility, byte_input, len(byte_input), - hash, hash_length) - - cls._check_error(ret, OlmHashError) - - return bytes_to_native_str(ffi.unpack(hash, hash_length)) - - -def ed25519_verify(key, message, signature): - # type: (AnyStr, AnyStr, AnyStr) -> None - """Verify an ed25519 signature. - - Raises an OlmVerifyError if verification fails. - - Args: - key(str): The ed25519 public key used for signing. - message(str): The signed message. - signature(bytes): The message signature. - """ - return _Utility._ed25519_verify(key, message, signature) - - -def sha256(input_string): - # type: (AnyStr) -> str - """Calculate the SHA-256 hash of the input and encodes it as base64. - - Args: - input_string(str): The input for which the hash will be calculated. - - """ - return _Utility._sha256(input_string) |