diff options
author | dec05eba <dec05eba@protonmail.com> | 2020-11-05 01:45:06 +0100 |
---|---|---|
committer | dec05eba <dec05eba@protonmail.com> | 2020-11-05 01:45:06 +0100 |
commit | 2a8202e74846d191a321cca1202175af9db6107d (patch) | |
tree | a6f455caf07da1186851f343a237a4c4e4484f46 /python/olm/session.py | |
parent | 8efa0ec17d8c262f9c3fd7603e8074f74a053708 (diff) |
Diffstat (limited to 'python/olm/session.py')
-rw-r--r-- | python/olm/session.py | 495 |
1 files changed, 0 insertions, 495 deletions
diff --git a/python/olm/session.py b/python/olm/session.py deleted file mode 100644 index 636eb3d..0000000 --- a/python/olm/session.py +++ /dev/null @@ -1,495 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Session module. - -This module contains the Olm Session part of the Olm library. - -It is used to establish a peer-to-peer encrypted communication channel between -two Olm accounts. - -Examples: - >>> alice = Account() - >>> bob = Account() - >>> bob.generate_one_time_keys(1) - >>> id_key = bob.identity_keys['curve25519'] - >>> one_time = list(bob.one_time_keys["curve25519"].values())[0] - >>> session = OutboundSession(alice, id_key, one_time) - -""" - -# pylint: disable=redefined-builtin,unused-import -from builtins import bytes, super -from typing import AnyStr, Optional, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str -from ._finalize import track_for_finalization - -# This is imported only for type checking purposes -if False: - from .account import Account # pragma: no cover - - -class OlmSessionError(Exception): - """libolm Session exception.""" - - -class _OlmMessage(object): - def __init__(self, ciphertext, message_type): - # type: (AnyStr, ffi.cdata) -> None - if not ciphertext: - raise ValueError("Ciphertext can't be empty") - - # I don't know why mypy wants a type annotation here nor why AnyStr - # doesn't work - self.ciphertext = ciphertext # type: ignore - self.message_type = message_type - - def __str__(self): - # type: () -> str - type_to_prefix = { - lib.OLM_MESSAGE_TYPE_PRE_KEY: "PRE_KEY", - lib.OLM_MESSAGE_TYPE_MESSAGE: "MESSAGE" - } - - prefix = type_to_prefix[self.message_type] - return "{} {}".format(prefix, self.ciphertext) - - -class OlmPreKeyMessage(_OlmMessage): - """Olm prekey message class - - Prekey messages are used to establish an Olm session. After the first - message exchange the session switches to normal messages - """ - - def __init__(self, ciphertext): - # type: (AnyStr) -> None - """Create a new Olm prekey message with the supplied ciphertext - - Args: - ciphertext(str): The ciphertext of the prekey message. - """ - _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_PRE_KEY) - - def __repr__(self): - # type: () -> str - return "OlmPreKeyMessage({})".format(self.ciphertext) - - -class OlmMessage(_OlmMessage): - """Olm message class""" - - def __init__(self, ciphertext): - # type: (AnyStr) -> None - """Create a new Olm message with the supplied ciphertext - - Args: - ciphertext(str): The ciphertext of the message. - """ - _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_MESSAGE) - - def __repr__(self): - # type: () -> str - return "OlmMessage({})".format(self.ciphertext) - - -def _clear_session(session): - # type: (ffi.cdata) -> None - lib.olm_clear_session(session) - - -class Session(object): - """libolm Session class. - This is an abstract class that can't be instantiated except when unpickling - a previously pickled InboundSession or OutboundSession object with - from_pickle. - """ - - def __new__(cls): - # type: (Type[Session]) -> Session - - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_session_size()) - obj._session = lib.olm_session(obj._buf) - track_for_finalization(obj, obj._session, _clear_session) - return obj - - def __init__(self): - # type: () -> None - if type(self) is Session: - raise TypeError("Session class may not be instantiated.") - - if False: - self._session = self._session # type: ffi.cdata - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str( - ffi.string(lib.olm_session_last_error(self._session))) - - raise OlmSessionError(last_error) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an Olm session. - - Stores a session as a base64 string. Encrypts the session using the - supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled session. Raises OlmSessionError on - failure. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the session. - """ - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - - pickle_length = lib.olm_pickle_session_length(self._session) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - self._check_error( - lib.olm_pickle_session(self._session, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, pickle_length)) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> Session - """Load a previously stored Olm session. - - Loads a session from a pickled base64 string and returns a Session - object. Decrypts the session using the supplied passphrase. Raises - OlmSessionError on failure. If the passphrase doesn't match the one - used to encrypt the session then the error message for the - exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded - then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - session - passphrase(str, optional): The passphrase used to encrypt the - session. - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_key = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - session = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_session(session._session, - ffi.from_buffer(byte_key), - len(byte_key), - pickle_buffer, - len(pickle)) - session._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_key)): - byte_key[i] = 0 - - return session - - def encrypt(self, plaintext): - # type: (AnyStr) -> _OlmMessage - """Encrypts a message using the session. Returns the ciphertext as a - base64 encoded string on success. Raises OlmSessionError on failure. - - Args: - plaintext(str): The plaintext message that will be encrypted. - """ - byte_plaintext = to_bytearray(plaintext) - - r_length = lib.olm_encrypt_random_length(self._session) - random = URANDOM(r_length) - - try: - message_type = lib.olm_encrypt_message_type(self._session) - - self._check_error(message_type) - - ciphertext_length = lib.olm_encrypt_message_length( - self._session, len(byte_plaintext) - ) - ciphertext_buffer = ffi.new("char[]", ciphertext_length) - - self._check_error(lib.olm_encrypt( - self._session, - ffi.from_buffer(byte_plaintext), len(byte_plaintext), - ffi.from_buffer(random), r_length, - ciphertext_buffer, ciphertext_length, - )) - finally: - # clear out copies of plaintext - if byte_plaintext is not plaintext: - for i in range(0, len(byte_plaintext)): - byte_plaintext[i] = 0 - - if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY: - return OlmPreKeyMessage( - bytes_to_native_str(ffi.unpack( - ciphertext_buffer, - ciphertext_length - ))) - elif message_type == lib.OLM_MESSAGE_TYPE_MESSAGE: - return OlmMessage( - bytes_to_native_str(ffi.unpack( - ciphertext_buffer, - ciphertext_length - ))) - else: # pragma: no cover - raise ValueError("Unknown message type") - - def decrypt(self, message, unicode_errors="replace"): - # type: (_OlmMessage, str) -> str - """Decrypts a message using the session. Returns the plaintext string - on success. Raises OlmSessionError on failure. If the base64 couldn't - be decoded then the error message will be "INVALID_BASE64". If the - message is for an unsupported version of the protocol the error message - will be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then - the error message will be "BAD_MESSAGE_FORMAT". If the MAC on the - message was invalid then the error message will be "BAD_MESSAGE_MAC". - - Args: - message(OlmMessage): The Olm message that will be decrypted. It can - be either an OlmPreKeyMessage or an OlmMessage. - unicode_errors(str, optional): The error handling scheme to use for - unicode decoding errors. The default is "replace" meaning that - the character that was unable to decode will be replaced with - the unicode replacement character (U+FFFD). Other possible - values are "strict", "ignore" and "xmlcharrefreplace" as well - as any other name registered with codecs.register_error that - can handle UnicodeEncodeErrors. - """ - if not message.ciphertext: - raise ValueError("Ciphertext can't be empty") - - byte_ciphertext = to_bytes(message.ciphertext) - # make a copy the ciphertext buffer, because - # olm_decrypt_max_plaintext_length wants to destroy something - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - - max_plaintext_length = lib.olm_decrypt_max_plaintext_length( - self._session, message.message_type, ciphertext_buffer, - len(byte_ciphertext) - ) - self._check_error(max_plaintext_length) - plaintext_buffer = ffi.new("char[]", max_plaintext_length) - - # make a copy the ciphertext buffer, because - # olm_decrypt_max_plaintext_length wants to destroy something - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - plaintext_length = lib.olm_decrypt( - self._session, message.message_type, - ciphertext_buffer, len(byte_ciphertext), - plaintext_buffer, max_plaintext_length - ) - self._check_error(plaintext_length) - plaintext = to_unicode_str( - ffi.unpack(plaintext_buffer, plaintext_length), - errors=unicode_errors - ) - - # clear out copies of the plaintext - lib.memset(plaintext_buffer, 0, max_plaintext_length) - - return plaintext - - @property - def id(self): - # type: () -> str - """str: An identifier for this session. Will be the same for both - ends of the conversation. - """ - id_length = lib.olm_session_id_length(self._session) - id_buffer = ffi.new("char[]", id_length) - - self._check_error( - lib.olm_session_id(self._session, id_buffer, id_length) - ) - return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) - - def matches(self, message, identity_key=None): - # type: (OlmPreKeyMessage, Optional[AnyStr]) -> bool - """Checks if the PRE_KEY message is for this in-bound session. - This can happen if multiple messages are sent to this session before - this session sends a message in reply. Returns True if the session - matches. Returns False if the session does not match. Raises - OlmSessionError on failure. If the base64 couldn't be decoded then the - error message will be "INVALID_BASE64". If the message was for an - unsupported protocol version then the error message will be - "BAD_MESSAGE_VERSION". If the message couldn't be decoded then then the - error message will be * "BAD_MESSAGE_FORMAT". - - Args: - message(OlmPreKeyMessage): The Olm prekey message that will checked - if it is intended for this session. - identity_key(str, optional): The identity key of the sender. To - check if the message was also sent using this identity key. - """ - if not isinstance(message, OlmPreKeyMessage): - raise TypeError("Matches can only be called with prekey messages.") - - if not message.ciphertext: - raise ValueError("Ciphertext can't be empty") - - ret = None - - byte_ciphertext = to_bytes(message.ciphertext) - # make a copy, because olm_matches_inbound_session(_from) will distroy - # it - message_buffer = ffi.new("char[]", byte_ciphertext) - - if identity_key: - byte_id_key = to_bytes(identity_key) - - ret = lib.olm_matches_inbound_session_from( - self._session, - ffi.from_buffer(byte_id_key), len(byte_id_key), - message_buffer, len(byte_ciphertext) - ) - - else: - ret = lib.olm_matches_inbound_session( - self._session, - message_buffer, len(byte_ciphertext)) - - self._check_error(ret) - - return bool(ret) - - -class InboundSession(Session): - """Inbound Olm session for p2p encrypted communication. - """ - - def __new__(cls, account, message, identity_key=None): - # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> Session - return super().__new__(cls) - - def __init__(self, account, message, identity_key=None): - # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> None - """Create a new inbound Olm session. - - Create a new in-bound session for sending/receiving messages from an - incoming prekey message. Raises OlmSessionError on failure. If the - base64 couldn't be decoded then error message will be "INVALID_BASE64". - If the message was for an unsupported protocol version then - the errror message will be "BAD_MESSAGE_VERSION". If the message - couldn't be decoded then then the error message will be - "BAD_MESSAGE_FORMAT". If the message refers to an unknown one-time - key then the error message will be "BAD_MESSAGE_KEY_ID". - - Args: - account(Account): The Olm Account that will be used to create this - session. - message(OlmPreKeyMessage): The Olm prekey message that will checked - that will be used to create this session. - identity_key(str, optional): The identity key of the sender. To - check if the message was also sent using this identity key. - """ - if not message.ciphertext: - raise ValueError("Ciphertext can't be empty") - - super().__init__() - byte_ciphertext = to_bytes(message.ciphertext) - message_buffer = ffi.new("char[]", byte_ciphertext) - - if identity_key: - byte_id_key = to_bytes(identity_key) - identity_key_buffer = ffi.new("char[]", byte_id_key) - self._check_error(lib.olm_create_inbound_session_from( - self._session, - account._account, - identity_key_buffer, len(byte_id_key), - message_buffer, len(byte_ciphertext) - )) - else: - self._check_error(lib.olm_create_inbound_session( - self._session, - account._account, - message_buffer, len(byte_ciphertext) - )) - - -class OutboundSession(Session): - """Outbound Olm session for p2p encrypted communication.""" - - def __new__(cls, account, identity_key, one_time_key): - # type: (Account, AnyStr, AnyStr) -> Session - return super().__new__(cls) - - def __init__(self, account, identity_key, one_time_key): - # type: (Account, AnyStr, AnyStr) -> None - """Create a new outbound Olm session. - - Creates a new outbound session for sending messages to a given - identity key and one-time key. - - Raises OlmSessionError on failure. If the keys couldn't be decoded as - base64 then the error message will be "INVALID_BASE64". - - Args: - account(Account): The Olm Account that will be used to create this - session. - identity_key(str): The identity key of the person with whom we want - to start the session. - one_time_key(str): A one-time key from the person with whom we want - to start the session. - """ - if not identity_key: - raise ValueError("Identity key can't be empty") - - if not one_time_key: - raise ValueError("One-time key can't be empty") - - super().__init__() - - byte_id_key = to_bytes(identity_key) - byte_one_time = to_bytes(one_time_key) - - session_random_length = lib.olm_create_outbound_session_random_length( - self._session) - - random = URANDOM(session_random_length) - - self._check_error(lib.olm_create_outbound_session( - self._session, - account._account, - ffi.from_buffer(byte_id_key), len(byte_id_key), - ffi.from_buffer(byte_one_time), len(byte_one_time), - ffi.from_buffer(random), session_random_length - )) |