From 2a8202e74846d191a321cca1202175af9db6107d Mon Sep 17 00:00:00 2001 From: dec05eba Date: Thu, 5 Nov 2020 01:45:06 +0100 Subject: Convert to sibs project --- python/olm/group_session.py | 532 -------------------------------------------- 1 file changed, 532 deletions(-) delete mode 100644 python/olm/group_session.py (limited to 'python/olm/group_session.py') diff --git a/python/olm/group_session.py b/python/olm/group_session.py deleted file mode 100644 index 5068192..0000000 --- a/python/olm/group_session.py +++ /dev/null @@ -1,532 +0,0 @@ -# -*- coding: utf-8 -*- -# libolm python bindings -# Copyright © 2015-2017 OpenMarket Ltd -# Copyright © 2018 Damir Jelić -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""libolm Group session module. - -This module contains the group session part of the Olm library. It contains two -classes for creating inbound and outbound group sessions. - -Examples: - >>> outbound = OutboundGroupSession() - >>> InboundGroupSession(outbound.session_key) -""" - -# pylint: disable=redefined-builtin,unused-import -from builtins import bytes, super -from typing import AnyStr, Optional, Tuple, Type - -from future.utils import bytes_to_native_str - -# pylint: disable=no-name-in-module -from _libolm import ffi, lib # type: ignore - -from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str -from ._finalize import track_for_finalization - - -def _clear_inbound_group_session(session): - # type: (ffi.cdata) -> None - lib.olm_clear_inbound_group_session(session) - - -def _clear_outbound_group_session(session): - # type: (ffi.cdata) -> None - lib.olm_clear_outbound_group_session(session) - - -class OlmGroupSessionError(Exception): - """libolm Group session error exception.""" - - -class InboundGroupSession(object): - """Inbound group session for encrypted multiuser communication.""" - - def __new__( - cls, # type: Type[InboundGroupSession] - session_key=None # type: Optional[str] - ): - # type: (...) -> InboundGroupSession - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size()) - obj._session = lib.olm_inbound_group_session(obj._buf) - track_for_finalization(obj, obj._session, _clear_inbound_group_session) - return obj - - def __init__(self, session_key): - # type: (AnyStr) -> None - """Create a new inbound group session. - Start a new inbound group session, from a key exported from - an outbound group session. - - Raises OlmGroupSessionError on failure. The error message of the - exception will be "OLM_INVALID_BASE64" if the session key is not valid - base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid. - """ - if False: # pragma: no cover - self._session = self._session # type: ffi.cdata - - byte_session_key = to_bytearray(session_key) - - try: - ret = lib.olm_init_inbound_group_session( - self._session, - ffi.from_buffer(byte_session_key), len(byte_session_key) - ) - finally: - if byte_session_key is not session_key: - for i in range(0, len(byte_session_key)): - byte_session_key[i] = 0 - self._check_error(ret) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an inbound group session. - - Stores a group session as a base64 string. Encrypts the session using - the supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled session. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the session. - """ - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - - pickle_length = lib.olm_pickle_inbound_group_session_length( - self._session) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - ret = lib.olm_pickle_inbound_group_session( - self._session, - ffi.from_buffer(byte_passphrase), len(byte_passphrase), - pickle_buffer, pickle_length - ) - self._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> InboundGroupSession - """Load a previously stored inbound group session. - - Loads an inbound group session from a pickled base64 string and returns - an InboundGroupSession object. Decrypts the session using the supplied - passphrase. Raises OlmSessionError on failure. If the passphrase - doesn't match the one used to encrypt the session then the error - message for the exception will be "BAD_ACCOUNT_KEY". If the base64 - couldn't be decoded then the error message will be "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - session - passphrase(str, optional): The passphrase used to encrypt the - session - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - obj = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_inbound_group_session( - obj._session, - ffi.from_buffer(byte_passphrase), - len(byte_passphrase), - pickle_buffer, - len(pickle) - ) - obj._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return obj - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str(ffi.string( - lib.olm_inbound_group_session_last_error(self._session))) - - raise OlmGroupSessionError(last_error) - - def decrypt(self, ciphertext, unicode_errors="replace"): - # type: (AnyStr, str) -> Tuple[str, int] - """Decrypt a message - - Returns a tuple of the decrypted plain-text and the message index of - the decrypted message or raises OlmGroupSessionError on failure. - On failure the error message of the exception will be: - - * OLM_INVALID_BASE64 if the message is not valid base64 - * OLM_BAD_MESSAGE_VERSION if the message was encrypted with an - unsupported version of the protocol - * OLM_BAD_MESSAGE_FORMAT if the message headers could not be - decoded - * OLM_BAD_MESSAGE_MAC if the message could not be verified - * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key - corresponding to the message's index (i.e., it was sent before - the session key was shared with us) - - Args: - ciphertext(str): Base64 encoded ciphertext containing the encrypted - message - unicode_errors(str, optional): The error handling scheme to use for - unicode decoding errors. The default is "replace" meaning that - the character that was unable to decode will be replaced with - the unicode replacement character (U+FFFD). Other possible - values are "strict", "ignore" and "xmlcharrefreplace" as well - as any other name registered with codecs.register_error that - can handle UnicodeEncodeErrors. - """ - if not ciphertext: - raise ValueError("Ciphertext can't be empty.") - - byte_ciphertext = to_bytes(ciphertext) - - # copy because max_plaintext_length will destroy the buffer - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - - max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length( - self._session, ciphertext_buffer, len(byte_ciphertext) - ) - self._check_error(max_plaintext_length) - plaintext_buffer = ffi.new("char[]", max_plaintext_length) - # copy because max_plaintext_length will destroy the buffer - ciphertext_buffer = ffi.new("char[]", byte_ciphertext) - - message_index = ffi.new("uint32_t*") - plaintext_length = lib.olm_group_decrypt( - self._session, ciphertext_buffer, len(byte_ciphertext), - plaintext_buffer, max_plaintext_length, - message_index - ) - - self._check_error(plaintext_length) - - plaintext = to_unicode_str( - ffi.unpack(plaintext_buffer, plaintext_length), - errors=unicode_errors - ) - - # clear out copies of the plaintext - lib.memset(plaintext_buffer, 0, max_plaintext_length) - - return plaintext, message_index[0] - - @property - def id(self): - # type: () -> str - """str: A base64 encoded identifier for this session.""" - id_length = lib.olm_inbound_group_session_id_length(self._session) - id_buffer = ffi.new("char[]", id_length) - ret = lib.olm_inbound_group_session_id( - self._session, - id_buffer, - id_length - ) - self._check_error(ret) - return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) - - @property - def first_known_index(self): - # type: () -> int - """int: The first message index we know how to decrypt.""" - return lib.olm_inbound_group_session_first_known_index(self._session) - - def export_session(self, message_index): - # type: (int) -> str - """Export an inbound group session - - Export the base64-encoded ratchet key for this session, at the given - index, in a format which can be used by import_session(). - - Raises OlmGroupSessionError on failure. The error message for the - exception will be: - - * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key - corresponding to the given index (ie, it was sent before the - session key was shared with us) - - Args: - message_index(int): The message index at which the session should - be exported. - """ - - export_length = lib.olm_export_inbound_group_session_length( - self._session) - - export_buffer = ffi.new("char[]", export_length) - ret = lib.olm_export_inbound_group_session( - self._session, - export_buffer, - export_length, - message_index - ) - self._check_error(ret) - export_str = bytes_to_native_str(ffi.unpack(export_buffer, export_length)) - - # clear out copies of the key - lib.memset(export_buffer, 0, export_length) - - return export_str - - @classmethod - def import_session(cls, session_key): - # type: (AnyStr) -> InboundGroupSession - """Create an InboundGroupSession from an exported session key. - - Creates an InboundGroupSession with an previously exported session key, - raises OlmGroupSessionError on failure. The error message for the - exception will be: - - * OLM_INVALID_BASE64 if the session_key is not valid base64 - * OLM_BAD_SESSION_KEY if the session_key is invalid - - Args: - session_key(str): The exported session key with which the inbound - group session will be created - """ - obj = cls.__new__(cls) - - byte_session_key = to_bytearray(session_key) - - try: - ret = lib.olm_import_inbound_group_session( - obj._session, - ffi.from_buffer(byte_session_key), - len(byte_session_key) - ) - obj._check_error(ret) - finally: - # clear out copies of the key - if byte_session_key is not session_key: - for i in range(0, len(byte_session_key)): - byte_session_key[i] = 0 - - return obj - - -class OutboundGroupSession(object): - """Outbound group session for encrypted multiuser communication.""" - - def __new__(cls): - # type: (Type[OutboundGroupSession]) -> OutboundGroupSession - obj = super().__new__(cls) - obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size()) - obj._session = lib.olm_outbound_group_session(obj._buf) - track_for_finalization( - obj, - obj._session, - _clear_outbound_group_session - ) - return obj - - def __init__(self): - # type: () -> None - """Create a new outbound group session. - - Start a new outbound group session. Raises OlmGroupSessionError on - failure. - """ - if False: # pragma: no cover - self._session = self._session # type: ffi.cdata - - random_length = lib.olm_init_outbound_group_session_random_length( - self._session - ) - random = URANDOM(random_length) - - ret = lib.olm_init_outbound_group_session( - self._session, ffi.from_buffer(random), random_length - ) - self._check_error(ret) - - def _check_error(self, ret): - # type: (int) -> None - if ret != lib.olm_error(): - return - - last_error = bytes_to_native_str(ffi.string( - lib.olm_outbound_group_session_last_error(self._session) - )) - - raise OlmGroupSessionError(last_error) - - def pickle(self, passphrase=""): - # type: (Optional[str]) -> bytes - """Store an outbound group session. - - Stores a group session as a base64 string. Encrypts the session using - the supplied passphrase. Returns a byte object containing the base64 - encoded string of the pickled session. - - Args: - passphrase(str, optional): The passphrase to be used to encrypt - the session. - """ - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - pickle_length = lib.olm_pickle_outbound_group_session_length( - self._session) - pickle_buffer = ffi.new("char[]", pickle_length) - - try: - ret = lib.olm_pickle_outbound_group_session( - self._session, - ffi.from_buffer(byte_passphrase), len(byte_passphrase), - pickle_buffer, pickle_length - ) - self._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return ffi.unpack(pickle_buffer, pickle_length) - - @classmethod - def from_pickle(cls, pickle, passphrase=""): - # type: (bytes, Optional[str]) -> OutboundGroupSession - """Load a previously stored outbound group session. - - Loads an outbound group session from a pickled base64 string and - returns an OutboundGroupSession object. Decrypts the session using the - supplied passphrase. Raises OlmSessionError on failure. If the - passphrase doesn't match the one used to encrypt the session then the - error message for the exception will be "BAD_ACCOUNT_KEY". If the - base64 couldn't be decoded then the error message will be - "INVALID_BASE64". - - Args: - pickle(bytes): Base64 encoded byte string containing the pickled - session - passphrase(str, optional): The passphrase used to encrypt the - """ - if not pickle: - raise ValueError("Pickle can't be empty") - - byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b"" - # copy because unpickle will destroy the buffer - pickle_buffer = ffi.new("char[]", pickle) - - obj = cls.__new__(cls) - - try: - ret = lib.olm_unpickle_outbound_group_session( - obj._session, - ffi.from_buffer(byte_passphrase), - len(byte_passphrase), - pickle_buffer, - len(pickle) - ) - obj._check_error(ret) - finally: - # clear out copies of the passphrase - for i in range(0, len(byte_passphrase)): - byte_passphrase[i] = 0 - - return obj - - def encrypt(self, plaintext): - # type: (AnyStr) -> str - """Encrypt a message. - - Returns the encrypted ciphertext. - - Args: - plaintext(str): A string that will be encrypted using the group - session. - """ - byte_plaintext = to_bytearray(plaintext) - message_length = lib.olm_group_encrypt_message_length( - self._session, len(byte_plaintext) - ) - - message_buffer = ffi.new("char[]", message_length) - - try: - ret = lib.olm_group_encrypt( - self._session, - ffi.from_buffer(byte_plaintext), len(byte_plaintext), - message_buffer, message_length, - ) - self._check_error(ret) - finally: - # clear out copies of plaintext - if byte_plaintext is not plaintext: - for i in range(0, len(byte_plaintext)): - byte_plaintext[i] = 0 - - return bytes_to_native_str(ffi.unpack(message_buffer, message_length)) - - @property - def id(self): - # type: () -> str - """str: A base64 encoded identifier for this session.""" - id_length = lib.olm_outbound_group_session_id_length(self._session) - id_buffer = ffi.new("char[]", id_length) - - ret = lib.olm_outbound_group_session_id( - self._session, - id_buffer, - id_length - ) - self._check_error(ret) - - return bytes_to_native_str(ffi.unpack(id_buffer, id_length)) - - @property - def message_index(self): - # type: () -> int - """int: The current message index of the session. - - Each message is encrypted with an increasing index. This is the index - for the next message. - """ - return lib.olm_outbound_group_session_message_index(self._session) - - @property - def session_key(self): - # type: () -> str - """The base64-encoded current ratchet key for this session. - - Each message is encrypted with a different ratchet key. This function - returns the ratchet key that will be used for the next message. - """ - key_length = lib.olm_outbound_group_session_key_length(self._session) - key_buffer = ffi.new("char[]", key_length) - - ret = lib.olm_outbound_group_session_key( - self._session, - key_buffer, - key_length - ) - self._check_error(ret) - - return bytes_to_native_str(ffi.unpack(key_buffer, key_length)) -- cgit v1.2.3