diff options
author | Hubert Chathi <hubert@uhoreg.ca> | 2018-10-19 11:34:11 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-19 11:34:11 -0400 |
commit | 1d880f9711e0f1b084e63221899f7da2e1087e28 (patch) | |
tree | 69ccb82241fa1c007b621c1bb687788138484d2c /python/olm/group_session.py | |
parent | 6e6facba3b4e2beb4e708271f0f23c7bf3840835 (diff) | |
parent | 0ec6a658583ae4d8b8463b20a640a0769b2ac630 (diff) |
Merge pull request #68 from matrix-org/poljar-python
Poljar's improved python bindings
Diffstat (limited to 'python/olm/group_session.py')
-rw-r--r-- | python/olm/group_session.py | 525 |
1 files changed, 525 insertions, 0 deletions
diff --git a/python/olm/group_session.py b/python/olm/group_session.py new file mode 100644 index 0000000..737d9ef --- /dev/null +++ b/python/olm/group_session.py @@ -0,0 +1,525 @@ +# -*- coding: utf-8 -*- +# libolm python bindings +# Copyright © 2015-2017 OpenMarket Ltd +# Copyright © 2018 Damir Jelić <poljar@termina.org.uk> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""libolm Group session module. + +This module contains the group session part of the Olm library. It contains two +classes for creating inbound and outbound group sessions. + +Examples: + >>> outbound = OutboundGroupSession() + >>> InboundGroupSession(outbound.session_key) +""" + +# pylint: disable=redefined-builtin,unused-import +from builtins import bytes, super +from typing import AnyStr, Optional, Tuple, Type + +from future.utils import bytes_to_native_str + +# pylint: disable=no-name-in-module +from _libolm import ffi, lib # type: ignore + +from ._compat import URANDOM, to_bytearray, to_bytes +from ._finalize import track_for_finalization + + +def _clear_inbound_group_session(session): + # type: (ffi.cdata) -> None + lib.olm_clear_inbound_group_session(session) + + +def _clear_outbound_group_session(session): + # type: (ffi.cdata) -> None + lib.olm_clear_outbound_group_session(session) + + +class OlmGroupSessionError(Exception): + """libolm Group session error exception.""" + + +class InboundGroupSession(object): + """Inbound group session for encrypted multiuser communication.""" + + def __new__( + cls, # type: Type[InboundGroupSession] + session_key=None # type: Optional[str] + ): + # type: (...) -> InboundGroupSession + obj = super().__new__(cls) + obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size()) + obj._session = lib.olm_inbound_group_session(obj._buf) + track_for_finalization(obj, obj._session, _clear_inbound_group_session) + return obj + + def __init__(self, session_key): + # type: (AnyStr) -> None + """Create a new inbound group session. + Start a new inbound group session, from a key exported from + an outbound group session. + + Raises OlmGroupSessionError on failure. The error message of the + exception will be "OLM_INVALID_BASE64" if the session key is not valid + base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid. + """ + if False: # pragma: no cover + self._session = self._session # type: ffi.cdata + + byte_session_key = to_bytearray(session_key) + + try: + ret = lib.olm_init_inbound_group_session( + self._session, + ffi.from_buffer(byte_session_key), len(byte_session_key) + ) + finally: + if byte_session_key is not session_key: + for i in range(0, len(byte_session_key)): + byte_session_key[i] = 0 + self._check_error(ret) + + def pickle(self, passphrase=""): + # type: (Optional[str]) -> bytes + """Store an inbound group session. + + Stores a group session as a base64 string. Encrypts the session using + the supplied passphrase. Returns a byte object containing the base64 + encoded string of the pickled session. + + Args: + passphrase(str, optional): The passphrase to be used to encrypt + the session. + """ + byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" + + pickle_length = lib.olm_pickle_inbound_group_session_length( + self._session) + pickle_buffer = ffi.new("char[]", pickle_length) + + try: + ret = lib.olm_pickle_inbound_group_session( + self._session, + ffi.from_buffer(byte_passphrase), len(byte_passphrase), + pickle_buffer, pickle_length + ) + self._check_error(ret) + finally: + # clear out copies of the passphrase + for i in range(0, len(byte_passphrase)): + byte_passphrase[i] = 0 + + return ffi.unpack(pickle_buffer, pickle_length) + + @classmethod + def from_pickle(cls, pickle, passphrase=""): + # type: (bytes, Optional[str]) -> InboundGroupSession + """Load a previously stored inbound group session. + + Loads an inbound group session from a pickled base64 string and returns + an InboundGroupSession object. Decrypts the session using the supplied + passphrase. Raises OlmSessionError on failure. If the passphrase + doesn't match the one used to encrypt the session then the error + message for the exception will be "BAD_ACCOUNT_KEY". If the base64 + couldn't be decoded then the error message will be "INVALID_BASE64". + + Args: + pickle(bytes): Base64 encoded byte string containing the pickled + session + passphrase(str, optional): The passphrase used to encrypt the + session + """ + if not pickle: + raise ValueError("Pickle can't be empty") + + byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" + # copy because unpickle will destroy the buffer + pickle_buffer = ffi.new("char[]", pickle) + + obj = cls.__new__(cls) + + try: + ret = lib.olm_unpickle_inbound_group_session( + obj._session, + ffi.from_buffer(byte_passphrase), + len(byte_passphrase), + pickle_buffer, + len(pickle) + ) + obj._check_error(ret) + finally: + # clear out copies of the passphrase + for i in range(0, len(byte_passphrase)): + byte_passphrase[i] = 0 + + return obj + + def _check_error(self, ret): + # type: (int) -> None + if ret != lib.olm_error(): + return + + last_error = bytes_to_native_str(ffi.string( + lib.olm_inbound_group_session_last_error(self._session))) + + raise OlmGroupSessionError(last_error) + + def decrypt(self, ciphertext): + # type: (AnyStr) -> Tuple[str, int] + """Decrypt a message + + Returns a tuple of the decrypted plain-text and the message index of + the decrypted message or raises OlmGroupSessionError on failure. + On failure the error message of the exception will be: + + * OLM_INVALID_BASE64 if the message is not valid base64 + * OLM_BAD_MESSAGE_VERSION if the message was encrypted with an + unsupported version of the protocol + * OLM_BAD_MESSAGE_FORMAT if the message headers could not be + decoded + * OLM_BAD_MESSAGE_MAC if the message could not be verified + * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key + corresponding to the message's index (i.e., it was sent before + the session key was shared with us) + + Args: + ciphertext(str): Base64 encoded ciphertext containing the encrypted + message + """ + if not ciphertext: + raise ValueError("Ciphertext can't be empty.") + + byte_ciphertext = to_bytes(ciphertext) + + # copy because max_plaintext_length will destroy the buffer + ciphertext_buffer = ffi.new("char[]", byte_ciphertext) + + max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length( + self._session, ciphertext_buffer, len(byte_ciphertext) + ) + self._check_error(max_plaintext_length) + plaintext_buffer = ffi.new("char[]", max_plaintext_length) + # copy because max_plaintext_length will destroy the buffer + ciphertext_buffer = ffi.new("char[]", byte_ciphertext) + + message_index = ffi.new("uint32_t*") + plaintext_length = lib.olm_group_decrypt( + self._session, ciphertext_buffer, len(byte_ciphertext), + plaintext_buffer, max_plaintext_length, + message_index + ) + + self._check_error(plaintext_length) + + plaintext = bytes_to_native_str(ffi.unpack( + plaintext_buffer, + plaintext_length + )) + + # clear out copies of the plaintext + lib.memset(plaintext_buffer, 0, max_plaintext_length) + + return plaintext, message_index[0] + + @property + def id(self): + # type: () -> str + """str: A base64 encoded identifier for this session.""" + id_length = lib.olm_inbound_group_session_id_length(self._session) + id_buffer = ffi.new("char[]", id_length) + ret = lib.olm_inbound_group_session_id( + self._session, + id_buffer, + id_length + ) + self._check_error(ret) + return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) + + @property + def first_known_index(self): + # type: () -> int + """int: The first message index we know how to decrypt.""" + return lib.olm_inbound_group_session_first_known_index(self._session) + + def export_session(self, message_index): + # type: (int) -> str + """Export an inbound group session + + Export the base64-encoded ratchet key for this session, at the given + index, in a format which can be used by import_session(). + + Raises OlmGroupSessionError on failure. The error message for the + exception will be: + + * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key + corresponding to the given index (ie, it was sent before the + session key was shared with us) + + Args: + message_index(int): The message index at which the session should + be exported. + """ + + export_length = lib.olm_export_inbound_group_session_length( + self._session) + + export_buffer = ffi.new("char[]", export_length) + ret = lib.olm_export_inbound_group_session( + self._session, + export_buffer, + export_length, + message_index + ) + self._check_error(ret) + export_str = bytes_to_native_str(ffi.unpack(export_buffer, export_length)) + + # clear out copies of the key + lib.memset(export_buffer, 0, export_length) + + return export_str + + @classmethod + def import_session(cls, session_key): + # type: (AnyStr) -> InboundGroupSession + """Create an InboundGroupSession from an exported session key. + + Creates an InboundGroupSession with an previously exported session key, + raises OlmGroupSessionError on failure. The error message for the + exception will be: + + * OLM_INVALID_BASE64 if the session_key is not valid base64 + * OLM_BAD_SESSION_KEY if the session_key is invalid + + Args: + session_key(str): The exported session key with which the inbound + group session will be created + """ + obj = cls.__new__(cls) + + byte_session_key = to_bytearray(session_key) + + try: + ret = lib.olm_import_inbound_group_session( + obj._session, + ffi.from_buffer(byte_session_key), + len(byte_session_key) + ) + obj._check_error(ret) + finally: + # clear out copies of the key + if byte_session_key is not session_key: + for i in range(0, len(byte_session_key)): + byte_session_key[i] = 0 + + return obj + + +class OutboundGroupSession(object): + """Outbound group session for encrypted multiuser communication.""" + + def __new__(cls): + # type: (Type[OutboundGroupSession]) -> OutboundGroupSession + obj = super().__new__(cls) + obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size()) + obj._session = lib.olm_outbound_group_session(obj._buf) + track_for_finalization( + obj, + obj._session, + _clear_outbound_group_session + ) + return obj + + def __init__(self): + # type: () -> None + """Create a new outbound group session. + + Start a new outbound group session. Raises OlmGroupSessionError on + failure. + """ + if False: # pragma: no cover + self._session = self._session # type: ffi.cdata + + random_length = lib.olm_init_outbound_group_session_random_length( + self._session + ) + random = URANDOM(random_length) + + ret = lib.olm_init_outbound_group_session( + self._session, ffi.from_buffer(random), random_length + ) + self._check_error(ret) + + def _check_error(self, ret): + # type: (int) -> None + if ret != lib.olm_error(): + return + + last_error = bytes_to_native_str(ffi.string( + lib.olm_outbound_group_session_last_error(self._session) + )) + + raise OlmGroupSessionError(last_error) + + def pickle(self, passphrase=""): + # type: (Optional[str]) -> bytes + """Store an outbound group session. + + Stores a group session as a base64 string. Encrypts the session using + the supplied passphrase. Returns a byte object containing the base64 + encoded string of the pickled session. + + Args: + passphrase(str, optional): The passphrase to be used to encrypt + the session. + """ + byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" + pickle_length = lib.olm_pickle_outbound_group_session_length( + self._session) + pickle_buffer = ffi.new("char[]", pickle_length) + + try: + ret = lib.olm_pickle_outbound_group_session( + self._session, + ffi.from_buffer(byte_passphrase), len(byte_passphrase), + pickle_buffer, pickle_length + ) + self._check_error(ret) + finally: + # clear out copies of the passphrase + for i in range(0, len(byte_passphrase)): + byte_passphrase[i] = 0 + + return ffi.unpack(pickle_buffer, pickle_length) + + @classmethod + def from_pickle(cls, pickle, passphrase=""): + # type: (bytes, Optional[str]) -> OutboundGroupSession + """Load a previously stored outbound group session. + + Loads an outbound group session from a pickled base64 string and + returns an OutboundGroupSession object. Decrypts the session using the + supplied passphrase. Raises OlmSessionError on failure. If the + passphrase doesn't match the one used to encrypt the session then the + error message for the exception will be "BAD_ACCOUNT_KEY". If the + base64 couldn't be decoded then the error message will be + "INVALID_BASE64". + + Args: + pickle(bytes): Base64 encoded byte string containing the pickled + session + passphrase(str, optional): The passphrase used to encrypt the + """ + if not pickle: + raise ValueError("Pickle can't be empty") + + byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" + # copy because unpickle will destroy the buffer + pickle_buffer = ffi.new("char[]", pickle) + + obj = cls.__new__(cls) + + try: + ret = lib.olm_unpickle_outbound_group_session( + obj._session, + ffi.from_buffer(byte_passphrase), + len(byte_passphrase), + pickle_buffer, + len(pickle) + ) + obj._check_error(ret) + finally: + # clear out copies of the passphrase + for i in range(0, len(byte_passphrase)): + byte_passphrase[i] = 0 + + return obj + + def encrypt(self, plaintext): + # type: (AnyStr) -> str + """Encrypt a message. + + Returns the encrypted ciphertext. + + Args: + plaintext(str): A string that will be encrypted using the group + session. + """ + byte_plaintext = to_bytearray(plaintext) + message_length = lib.olm_group_encrypt_message_length( + self._session, len(byte_plaintext) + ) + + message_buffer = ffi.new("char[]", message_length) + + try: + ret = lib.olm_group_encrypt( + self._session, + ffi.from_buffer(byte_plaintext), len(byte_plaintext), + message_buffer, message_length, + ) + self._check_error(ret) + finally: + # clear out copies of plaintext + if byte_plaintext is not plaintext: + for i in range(0, len(byte_plaintext)): + byte_plaintext[i] = 0 + + return bytes_to_native_str(ffi.unpack(message_buffer, message_length)) + + @property + def id(self): + # type: () -> str + """str: A base64 encoded identifier for this session.""" + id_length = lib.olm_outbound_group_session_id_length(self._session) + id_buffer = ffi.new("char[]", id_length) + + ret = lib.olm_outbound_group_session_id( + self._session, + id_buffer, + id_length + ) + self._check_error(ret) + + return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) + + @property + def message_index(self): + # type: () -> int + """int: The current message index of the session. + + Each message is encrypted with an increasing index. This is the index + for the next message. + """ + return lib.olm_outbound_group_session_message_index(self._session) + + @property + def session_key(self): + # type: () -> str + """The base64-encoded current ratchet key for this session. + + Each message is encrypted with a different ratchet key. This function + returns the ratchet key that will be used for the next message. + """ + key_length = lib.olm_outbound_group_session_key_length(self._session) + key_buffer = ffi.new("char[]", key_length) + + ret = lib.olm_outbound_group_session_key( + self._session, + key_buffer, + key_length + ) + self._check_error(ret) + + return bytes_to_native_str(ffi.unpack(key_buffer, key_length)) |