aboutsummaryrefslogtreecommitdiff
path: root/python/olm
diff options
context:
space:
mode:
Diffstat (limited to 'python/olm')
-rw-r--r--python/olm/__init__.py48
-rw-r--r--python/olm/__version__.py9
-rw-r--r--python/olm/_compat.py67
-rw-r--r--python/olm/_finalize.py65
-rw-r--r--python/olm/account.py271
-rw-r--r--python/olm/group_session.py532
-rw-r--r--python/olm/pk.py461
-rw-r--r--python/olm/sas.py245
-rw-r--r--python/olm/session.py495
-rw-r--r--python/olm/utility.py151
10 files changed, 0 insertions, 2344 deletions
diff --git a/python/olm/__init__.py b/python/olm/__init__.py
deleted file mode 100644
index 26257a5..0000000
--- a/python/olm/__init__.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-Olm Python bindings
-~~~~~~~~~~~~~~~~~~~~~
-| This package implements python bindings for the libolm C library.
-| © Copyright 2015-2017 by OpenMarket Ltd
-| © Copyright 2018 by Damir Jelić
-"""
-from .utility import ed25519_verify, OlmVerifyError, OlmHashError, sha256
-from .account import Account, OlmAccountError
-from .session import (
- Session,
- InboundSession,
- OutboundSession,
- OlmSessionError,
- OlmMessage,
- OlmPreKeyMessage
-)
-from .group_session import (
- InboundGroupSession,
- OutboundGroupSession,
- OlmGroupSessionError
-)
-from .pk import (
- PkMessage,
- PkEncryption,
- PkDecryption,
- PkSigning,
- PkEncryptionError,
- PkDecryptionError,
- PkSigningError
-)
-from .sas import Sas, OlmSasError
diff --git a/python/olm/__version__.py b/python/olm/__version__.py
deleted file mode 100644
index 498f89f..0000000
--- a/python/olm/__version__.py
+++ /dev/null
@@ -1,9 +0,0 @@
-__title__ = "python-olm"
-__description__ = ("python CFFI bindings for the olm "
- "cryptographic ratchet library")
-__url__ = "https://github.com/poljar/python-olm"
-__version__ = "3.2.1"
-__author__ = "Damir Jelić"
-__author_email__ = "poljar@termina.org.uk"
-__license__ = "Apache 2.0"
-__copyright__ = "Copyright 2018-2019 Damir Jelić"
diff --git a/python/olm/_compat.py b/python/olm/_compat.py
deleted file mode 100644
index 2ceaa33..0000000
--- a/python/olm/_compat.py
+++ /dev/null
@@ -1,67 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from builtins import bytes, str
-from typing import AnyStr
-
-try:
- import secrets
- URANDOM = secrets.token_bytes # pragma: no cover
-except ImportError: # pragma: no cover
- from os import urandom
- URANDOM = urandom # type: ignore
-
-
-def to_bytearray(string):
- # type: (AnyStr) -> bytes
- if isinstance(string, bytes):
- return bytearray(string)
- elif isinstance(string, str):
- return bytearray(string, "utf-8")
-
- raise TypeError("Invalid type {}".format(type(string)))
-
-
-def to_bytes(string):
- # type: (AnyStr) -> bytes
- if isinstance(string, bytes):
- return string
- elif isinstance(string, str):
- return bytes(string, "utf-8")
-
- raise TypeError("Invalid type {}".format(type(string)))
-
-
-def to_unicode_str(byte_string, errors="replace"):
- """Turn a byte string into a unicode string.
-
- Should be used everywhere where the input byte string might not be trusted
- and may contain invalid unicode values.
-
- Args:
- byte_string (bytes): The bytestring that will be converted to a native
- string.
- errors (str, optional): The error handling scheme that should be used
- to handle unicode decode errors. Can be one of "strict" (raise an
- UnicodeDecodeError exception, "ignore" (remove the offending
- characters), "replace" (replace the offending character with
- U+FFFD), "xmlcharrefreplace" as well as any other name registered
- with codecs.register_error that can handle UnicodeEncodeErrors.
-
- Returns the decoded native string.
- """
- return byte_string.decode(encoding="utf-8", errors=errors)
diff --git a/python/olm/_finalize.py b/python/olm/_finalize.py
deleted file mode 100644
index 9f467bc..0000000
--- a/python/olm/_finalize.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# The MIT License (MIT)
-# Copyright (c) 2010 Benjamin Peterson <benjamin@python.org>
-
-# Permission is hereby granted, free of charge, to any person obtaining a copy
-# of this software and associated documentation files (the "Software"), to deal
-# in the Software without restriction, including without limitation the rights
-# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-# copies of the Software, and to permit persons to whom the Software is
-# furnished to do so, subject to the following conditions:
-
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
-# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
-# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
-# OR OTHER DEALINGS IN THE SOFTWARE.
-
-"""Finalization with weakrefs
-
-This is designed for avoiding __del__.
-"""
-from __future__ import print_function
-
-import sys
-import traceback
-import weakref
-
-__author__ = "Benjamin Peterson <benjamin@python.org>"
-
-
-class OwnerRef(weakref.ref):
- """A simple weakref.ref subclass, so attributes can be added."""
- pass
-
-
-def _run_finalizer(ref):
- """Internal weakref callback to run finalizers"""
- del _finalize_refs[id(ref)]
- finalizer = ref.finalizer
- item = ref.item
- try:
- finalizer(item)
- except Exception: # pragma: no cover
- print("Exception running {}:".format(finalizer), file=sys.stderr)
- traceback.print_exc()
-
-
-_finalize_refs = {}
-
-
-def track_for_finalization(owner, item, finalizer):
- """Register an object for finalization.
-
- ``owner`` is the the object which is responsible for ``item``.
- ``finalizer`` will be called with ``item`` as its only argument when
- ``owner`` is destroyed by the garbage collector.
- """
- ref = OwnerRef(owner, _run_finalizer)
- ref.item = item
- ref.finalizer = finalizer
- _finalize_refs[id(ref)] = ref
diff --git a/python/olm/account.py b/python/olm/account.py
deleted file mode 100644
index 8455655..0000000
--- a/python/olm/account.py
+++ /dev/null
@@ -1,271 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm Account module.
-
-This module contains the account part of the Olm library. It contains a single
-Account class which handles the creation of new accounts as well as the storing
-and restoring of them.
-
-Examples:
- >>> acc = Account()
- >>> account.identity_keys()
- >>> account.generate_one_time_keys(1)
-
-"""
-
-import json
-# pylint: disable=redefined-builtin,unused-import
-from builtins import bytes, super
-from typing import AnyStr, Dict, Optional, Type
-
-from future.utils import bytes_to_native_str
-
-# pylint: disable=no-name-in-module
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import URANDOM, to_bytearray
-from ._finalize import track_for_finalization
-
-# This is imported only for type checking purposes
-if False:
- from .session import Session # pragma: no cover
-
-
-def _clear_account(account):
- # type: (ffi.cdata) -> None
- lib.olm_clear_account(account)
-
-
-class OlmAccountError(Exception):
- """libolm Account error exception."""
-
-
-class Account(object):
- """libolm Account class."""
-
- def __new__(cls):
- # type: (Type[Account]) -> Account
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_account_size())
- obj._account = lib.olm_account(obj._buf)
- track_for_finalization(obj, obj._account, _clear_account)
- return obj
-
- def __init__(self):
- # type: () -> None
- """Create a new Olm account.
-
- Creates a new account and its matching identity key pair.
-
- Raises OlmAccountError on failure. If there weren't enough random bytes
- for the account creation the error message for the exception will be
- NOT_ENOUGH_RANDOM.
- """
- # This is needed to silence mypy not knowing the type of _account.
- # There has to be a better way for this.
- if False: # pragma: no cover
- self._account = self._account # type: ffi.cdata
-
- random_length = lib.olm_create_account_random_length(self._account)
- random = URANDOM(random_length)
-
- self._check_error(
- lib.olm_create_account(self._account, ffi.from_buffer(random),
- random_length))
-
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string((lib.olm_account_last_error(self._account))))
-
- raise OlmAccountError(last_error)
-
- def pickle(self, passphrase=""):
- # type: (Optional[str]) -> bytes
- """Store an Olm account.
-
- Stores an account as a base64 string. Encrypts the account using the
- supplied passphrase. Returns a byte object containing the base64
- encoded string of the pickled account. Raises OlmAccountError on
- failure.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the account.
- """
- byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
-
- pickle_length = lib.olm_pickle_account_length(self._account)
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- try:
- self._check_error(
- lib.olm_pickle_account(self._account,
- ffi.from_buffer(byte_key),
- len(byte_key),
- pickle_buffer,
- pickle_length))
- finally:
- # zero out copies of the passphrase
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # type: (bytes, Optional[str]) -> Account
- """Load a previously stored olm account.
-
- Loads an account from a pickled base64-encoded string and returns an
- Account object. Decrypts the account using the supplied passphrase.
- Raises OlmAccountError on failure. If the passphrase doesn't match the
- one used to encrypt the account then the error message for the
- exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
- then the error message will be "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- account
- passphrase(str, optional): The passphrase used to encrypt the
- account.
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
- # copy because unpickle will destroy the buffer
- pickle_buffer = ffi.new("char[]", pickle)
-
- obj = cls.__new__(cls)
-
- try:
- ret = lib.olm_unpickle_account(obj._account,
- ffi.from_buffer(byte_key),
- len(byte_key),
- pickle_buffer,
- len(pickle))
- obj._check_error(ret)
- finally:
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return obj
-
- @property
- def identity_keys(self):
- # type: () -> Dict[str, str]
- """dict: Public part of the identity keys of the account."""
- out_length = lib.olm_account_identity_keys_length(self._account)
- out_buffer = ffi.new("char[]", out_length)
-
- self._check_error(
- lib.olm_account_identity_keys(self._account, out_buffer,
- out_length))
- return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
-
- def sign(self, message):
- # type: (AnyStr) -> str
- """Signs a message with this account.
-
- Signs a message with the private ed25519 identity key of this account.
- Returns the signature.
- Raises OlmAccountError on failure.
-
- Args:
- message(str): The message to sign.
- """
- bytes_message = to_bytearray(message)
- out_length = lib.olm_account_signature_length(self._account)
- out_buffer = ffi.new("char[]", out_length)
-
- try:
- self._check_error(
- lib.olm_account_sign(self._account,
- ffi.from_buffer(bytes_message),
- len(bytes_message), out_buffer,
- out_length))
- finally:
- # clear out copies of the message, which may be plaintext
- if bytes_message is not message:
- for i in range(0, len(bytes_message)):
- bytes_message[i] = 0
-
- return bytes_to_native_str(ffi.unpack(out_buffer, out_length))
-
- @property
- def max_one_time_keys(self):
- # type: () -> int
- """int: The maximum number of one-time keys the account can store."""
- return lib.olm_account_max_number_of_one_time_keys(self._account)
-
- def mark_keys_as_published(self):
- # type: () -> None
- """Mark the current set of one-time keys as being published."""
- lib.olm_account_mark_keys_as_published(self._account)
-
- def generate_one_time_keys(self, count):
- # type: (int) -> None
- """Generate a number of new one-time keys.
-
- If the total number of keys stored by this account exceeds
- max_one_time_keys() then the old keys are discarded.
- Raises OlmAccountError on error.
-
- Args:
- count(int): The number of keys to generate.
- """
- random_length = lib.olm_account_generate_one_time_keys_random_length(
- self._account, count)
- random = URANDOM(random_length)
-
- self._check_error(
- lib.olm_account_generate_one_time_keys(
- self._account, count, ffi.from_buffer(random), random_length))
-
- @property
- def one_time_keys(self):
- # type: () -> Dict[str, Dict[str, str]]
- """dict: The public part of the one-time keys for this account."""
- out_length = lib.olm_account_one_time_keys_length(self._account)
- out_buffer = ffi.new("char[]", out_length)
-
- self._check_error(
- lib.olm_account_one_time_keys(self._account, out_buffer,
- out_length))
-
- return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
-
- def remove_one_time_keys(self, session):
- # type: (Session) -> None
- """Remove used one-time keys.
-
- Removes the one-time keys that the session used from the account.
- Raises OlmAccountError on failure. If the account doesn't have any
- matching one-time keys then the error message of the exception will be
- "BAD_MESSAGE_KEY_ID".
-
- Args:
- session(Session): An Olm Session object that was created with this
- account.
- """
- self._check_error(lib.olm_remove_one_time_keys(self._account,
- session._session))
diff --git a/python/olm/group_session.py b/python/olm/group_session.py
deleted file mode 100644
index 5068192..0000000
--- a/python/olm/group_session.py
+++ /dev/null
@@ -1,532 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm Group session module.
-
-This module contains the group session part of the Olm library. It contains two
-classes for creating inbound and outbound group sessions.
-
-Examples:
- >>> outbound = OutboundGroupSession()
- >>> InboundGroupSession(outbound.session_key)
-"""
-
-# pylint: disable=redefined-builtin,unused-import
-from builtins import bytes, super
-from typing import AnyStr, Optional, Tuple, Type
-
-from future.utils import bytes_to_native_str
-
-# pylint: disable=no-name-in-module
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str
-from ._finalize import track_for_finalization
-
-
-def _clear_inbound_group_session(session):
- # type: (ffi.cdata) -> None
- lib.olm_clear_inbound_group_session(session)
-
-
-def _clear_outbound_group_session(session):
- # type: (ffi.cdata) -> None
- lib.olm_clear_outbound_group_session(session)
-
-
-class OlmGroupSessionError(Exception):
- """libolm Group session error exception."""
-
-
-class InboundGroupSession(object):
- """Inbound group session for encrypted multiuser communication."""
-
- def __new__(
- cls, # type: Type[InboundGroupSession]
- session_key=None # type: Optional[str]
- ):
- # type: (...) -> InboundGroupSession
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size())
- obj._session = lib.olm_inbound_group_session(obj._buf)
- track_for_finalization(obj, obj._session, _clear_inbound_group_session)
- return obj
-
- def __init__(self, session_key):
- # type: (AnyStr) -> None
- """Create a new inbound group session.
- Start a new inbound group session, from a key exported from
- an outbound group session.
-
- Raises OlmGroupSessionError on failure. The error message of the
- exception will be "OLM_INVALID_BASE64" if the session key is not valid
- base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid.
- """
- if False: # pragma: no cover
- self._session = self._session # type: ffi.cdata
-
- byte_session_key = to_bytearray(session_key)
-
- try:
- ret = lib.olm_init_inbound_group_session(
- self._session,
- ffi.from_buffer(byte_session_key), len(byte_session_key)
- )
- finally:
- if byte_session_key is not session_key:
- for i in range(0, len(byte_session_key)):
- byte_session_key[i] = 0
- self._check_error(ret)
-
- def pickle(self, passphrase=""):
- # type: (Optional[str]) -> bytes
- """Store an inbound group session.
-
- Stores a group session as a base64 string. Encrypts the session using
- the supplied passphrase. Returns a byte object containing the base64
- encoded string of the pickled session.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the session.
- """
- byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
-
- pickle_length = lib.olm_pickle_inbound_group_session_length(
- self._session)
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- try:
- ret = lib.olm_pickle_inbound_group_session(
- self._session,
- ffi.from_buffer(byte_passphrase), len(byte_passphrase),
- pickle_buffer, pickle_length
- )
- self._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_passphrase)):
- byte_passphrase[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # type: (bytes, Optional[str]) -> InboundGroupSession
- """Load a previously stored inbound group session.
-
- Loads an inbound group session from a pickled base64 string and returns
- an InboundGroupSession object. Decrypts the session using the supplied
- passphrase. Raises OlmSessionError on failure. If the passphrase
- doesn't match the one used to encrypt the session then the error
- message for the exception will be "BAD_ACCOUNT_KEY". If the base64
- couldn't be decoded then the error message will be "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- session
- passphrase(str, optional): The passphrase used to encrypt the
- session
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
- # copy because unpickle will destroy the buffer
- pickle_buffer = ffi.new("char[]", pickle)
-
- obj = cls.__new__(cls)
-
- try:
- ret = lib.olm_unpickle_inbound_group_session(
- obj._session,
- ffi.from_buffer(byte_passphrase),
- len(byte_passphrase),
- pickle_buffer,
- len(pickle)
- )
- obj._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_passphrase)):
- byte_passphrase[i] = 0
-
- return obj
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(ffi.string(
- lib.olm_inbound_group_session_last_error(self._session)))
-
- raise OlmGroupSessionError(last_error)
-
- def decrypt(self, ciphertext, unicode_errors="replace"):
- # type: (AnyStr, str) -> Tuple[str, int]
- """Decrypt a message
-
- Returns a tuple of the decrypted plain-text and the message index of
- the decrypted message or raises OlmGroupSessionError on failure.
- On failure the error message of the exception will be:
-
- * OLM_INVALID_BASE64 if the message is not valid base64
- * OLM_BAD_MESSAGE_VERSION if the message was encrypted with an
- unsupported version of the protocol
- * OLM_BAD_MESSAGE_FORMAT if the message headers could not be
- decoded
- * OLM_BAD_MESSAGE_MAC if the message could not be verified
- * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
- corresponding to the message's index (i.e., it was sent before
- the session key was shared with us)
-
- Args:
- ciphertext(str): Base64 encoded ciphertext containing the encrypted
- message
- unicode_errors(str, optional): The error handling scheme to use for
- unicode decoding errors. The default is "replace" meaning that
- the character that was unable to decode will be replaced with
- the unicode replacement character (U+FFFD). Other possible
- values are "strict", "ignore" and "xmlcharrefreplace" as well
- as any other name registered with codecs.register_error that
- can handle UnicodeEncodeErrors.
- """
- if not ciphertext:
- raise ValueError("Ciphertext can't be empty.")
-
- byte_ciphertext = to_bytes(ciphertext)
-
- # copy because max_plaintext_length will destroy the buffer
- ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
-
- max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
- self._session, ciphertext_buffer, len(byte_ciphertext)
- )
- self._check_error(max_plaintext_length)
- plaintext_buffer = ffi.new("char[]", max_plaintext_length)
- # copy because max_plaintext_length will destroy the buffer
- ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
-
- message_index = ffi.new("uint32_t*")
- plaintext_length = lib.olm_group_decrypt(
- self._session, ciphertext_buffer, len(byte_ciphertext),
- plaintext_buffer, max_plaintext_length,
- message_index
- )
-
- self._check_error(plaintext_length)
-
- plaintext = to_unicode_str(
- ffi.unpack(plaintext_buffer, plaintext_length),
- errors=unicode_errors
- )
-
- # clear out copies of the plaintext
- lib.memset(plaintext_buffer, 0, max_plaintext_length)
-
- return plaintext, message_index[0]
-
- @property
- def id(self):
- # type: () -> str
- """str: A base64 encoded identifier for this session."""
- id_length = lib.olm_inbound_group_session_id_length(self._session)
- id_buffer = ffi.new("char[]", id_length)
- ret = lib.olm_inbound_group_session_id(
- self._session,
- id_buffer,
- id_length
- )
- self._check_error(ret)
- return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
-
- @property
- def first_known_index(self):
- # type: () -> int
- """int: The first message index we know how to decrypt."""
- return lib.olm_inbound_group_session_first_known_index(self._session)
-
- def export_session(self, message_index):
- # type: (int) -> str
- """Export an inbound group session
-
- Export the base64-encoded ratchet key for this session, at the given
- index, in a format which can be used by import_session().
-
- Raises OlmGroupSessionError on failure. The error message for the
- exception will be:
-
- * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
- corresponding to the given index (ie, it was sent before the
- session key was shared with us)
-
- Args:
- message_index(int): The message index at which the session should
- be exported.
- """
-
- export_length = lib.olm_export_inbound_group_session_length(
- self._session)
-
- export_buffer = ffi.new("char[]", export_length)
- ret = lib.olm_export_inbound_group_session(
- self._session,
- export_buffer,
- export_length,
- message_index
- )
- self._check_error(ret)
- export_str = bytes_to_native_str(ffi.unpack(export_buffer, export_length))
-
- # clear out copies of the key
- lib.memset(export_buffer, 0, export_length)
-
- return export_str
-
- @classmethod
- def import_session(cls, session_key):
- # type: (AnyStr) -> InboundGroupSession
- """Create an InboundGroupSession from an exported session key.
-
- Creates an InboundGroupSession with an previously exported session key,
- raises OlmGroupSessionError on failure. The error message for the
- exception will be:
-
- * OLM_INVALID_BASE64 if the session_key is not valid base64
- * OLM_BAD_SESSION_KEY if the session_key is invalid
-
- Args:
- session_key(str): The exported session key with which the inbound
- group session will be created
- """
- obj = cls.__new__(cls)
-
- byte_session_key = to_bytearray(session_key)
-
- try:
- ret = lib.olm_import_inbound_group_session(
- obj._session,
- ffi.from_buffer(byte_session_key),
- len(byte_session_key)
- )
- obj._check_error(ret)
- finally:
- # clear out copies of the key
- if byte_session_key is not session_key:
- for i in range(0, len(byte_session_key)):
- byte_session_key[i] = 0
-
- return obj
-
-
-class OutboundGroupSession(object):
- """Outbound group session for encrypted multiuser communication."""
-
- def __new__(cls):
- # type: (Type[OutboundGroupSession]) -> OutboundGroupSession
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size())
- obj._session = lib.olm_outbound_group_session(obj._buf)
- track_for_finalization(
- obj,
- obj._session,
- _clear_outbound_group_session
- )
- return obj
-
- def __init__(self):
- # type: () -> None
- """Create a new outbound group session.
-
- Start a new outbound group session. Raises OlmGroupSessionError on
- failure.
- """
- if False: # pragma: no cover
- self._session = self._session # type: ffi.cdata
-
- random_length = lib.olm_init_outbound_group_session_random_length(
- self._session
- )
- random = URANDOM(random_length)
-
- ret = lib.olm_init_outbound_group_session(
- self._session, ffi.from_buffer(random), random_length
- )
- self._check_error(ret)
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(ffi.string(
- lib.olm_outbound_group_session_last_error(self._session)
- ))
-
- raise OlmGroupSessionError(last_error)
-
- def pickle(self, passphrase=""):
- # type: (Optional[str]) -> bytes
- """Store an outbound group session.
-
- Stores a group session as a base64 string. Encrypts the session using
- the supplied passphrase. Returns a byte object containing the base64
- encoded string of the pickled session.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the session.
- """
- byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
- pickle_length = lib.olm_pickle_outbound_group_session_length(
- self._session)
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- try:
- ret = lib.olm_pickle_outbound_group_session(
- self._session,
- ffi.from_buffer(byte_passphrase), len(byte_passphrase),
- pickle_buffer, pickle_length
- )
- self._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_passphrase)):
- byte_passphrase[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # type: (bytes, Optional[str]) -> OutboundGroupSession
- """Load a previously stored outbound group session.
-
- Loads an outbound group session from a pickled base64 string and
- returns an OutboundGroupSession object. Decrypts the session using the
- supplied passphrase. Raises OlmSessionError on failure. If the
- passphrase doesn't match the one used to encrypt the session then the
- error message for the exception will be "BAD_ACCOUNT_KEY". If the
- base64 couldn't be decoded then the error message will be
- "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- session
- passphrase(str, optional): The passphrase used to encrypt the
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_passphrase = bytearray(passphrase, "utf-8") if passphrase else b""
- # copy because unpickle will destroy the buffer
- pickle_buffer = ffi.new("char[]", pickle)
-
- obj = cls.__new__(cls)
-
- try:
- ret = lib.olm_unpickle_outbound_group_session(
- obj._session,
- ffi.from_buffer(byte_passphrase),
- len(byte_passphrase),
- pickle_buffer,
- len(pickle)
- )
- obj._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_passphrase)):
- byte_passphrase[i] = 0
-
- return obj
-
- def encrypt(self, plaintext):
- # type: (AnyStr) -> str
- """Encrypt a message.
-
- Returns the encrypted ciphertext.
-
- Args:
- plaintext(str): A string that will be encrypted using the group
- session.
- """
- byte_plaintext = to_bytearray(plaintext)
- message_length = lib.olm_group_encrypt_message_length(
- self._session, len(byte_plaintext)
- )
-
- message_buffer = ffi.new("char[]", message_length)
-
- try:
- ret = lib.olm_group_encrypt(
- self._session,
- ffi.from_buffer(byte_plaintext), len(byte_plaintext),
- message_buffer, message_length,
- )
- self._check_error(ret)
- finally:
- # clear out copies of plaintext
- if byte_plaintext is not plaintext:
- for i in range(0, len(byte_plaintext)):
- byte_plaintext[i] = 0
-
- return bytes_to_native_str(ffi.unpack(message_buffer, message_length))
-
- @property
- def id(self):
- # type: () -> str
- """str: A base64 encoded identifier for this session."""
- id_length = lib.olm_outbound_group_session_id_length(self._session)
- id_buffer = ffi.new("char[]", id_length)
-
- ret = lib.olm_outbound_group_session_id(
- self._session,
- id_buffer,
- id_length
- )
- self._check_error(ret)
-
- return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
-
- @property
- def message_index(self):
- # type: () -> int
- """int: The current message index of the session.
-
- Each message is encrypted with an increasing index. This is the index
- for the next message.
- """
- return lib.olm_outbound_group_session_message_index(self._session)
-
- @property
- def session_key(self):
- # type: () -> str
- """The base64-encoded current ratchet key for this session.
-
- Each message is encrypted with a different ratchet key. This function
- returns the ratchet key that will be used for the next message.
- """
- key_length = lib.olm_outbound_group_session_key_length(self._session)
- key_buffer = ffi.new("char[]", key_length)
-
- ret = lib.olm_outbound_group_session_key(
- self._session,
- key_buffer,
- key_length
- )
- self._check_error(ret)
-
- return bytes_to_native_str(ffi.unpack(key_buffer, key_length))
diff --git a/python/olm/pk.py b/python/olm/pk.py
deleted file mode 100644
index 4352359..0000000
--- a/python/olm/pk.py
+++ /dev/null
@@ -1,461 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm PK module.
-
-This module contains bindings to the PK part of the Olm library.
-It contains two classes PkDecryption and PkEncryption that are used to
-establish an encrypted communication channel using public key encryption,
-as well as a class PkSigning that is used to sign a message.
-
-Examples:
- >>> decryption = PkDecryption()
- >>> encryption = PkEncryption(decryption.public_key)
- >>> plaintext = "It's a secret to everybody."
- >>> message = encryption.encrypt(plaintext)
- >>> decrypted_plaintext = decryption.decrypt(message)
- >>> seed = PkSigning.generate_seed()
- >>> signing = PkSigning(seed)
- >>> signature = signing.sign(plaintext)
- >>> ed25519_verify(signing.public_key, plaintext, signature)
-
-"""
-
-from builtins import super
-from typing import AnyStr, Type
-
-from future.utils import bytes_to_native_str
-
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import URANDOM, to_bytearray, to_unicode_str
-from ._finalize import track_for_finalization
-
-
-class PkEncryptionError(Exception):
- """libolm Pk encryption exception."""
-
-
-class PkDecryptionError(Exception):
- """libolm Pk decryption exception."""
-
-
-class PkSigningError(Exception):
- """libolm Pk signing exception."""
-
-
-def _clear_pk_encryption(pk_struct):
- lib.olm_clear_pk_encryption(pk_struct)
-
-
-class PkMessage(object):
- """A PK encrypted message."""
-
- def __init__(self, ephemeral_key, mac, ciphertext):
- # type: (str, str, str) -> None
- """Create a new PK encrypted message.
-
- Args:
- ephemeral_key(str): the public part of the ephemeral key
- used (together with the recipient's key) to generate a symmetric
- encryption key.
- mac(str): Message Authentication Code of the encrypted message
- ciphertext(str): The cipher text of the encrypted message
- """
- self.ephemeral_key = ephemeral_key
- self.mac = mac
- self.ciphertext = ciphertext
-
-
-class PkEncryption(object):
- """PkEncryption class.
-
- Represents the decryption part of a PK encrypted channel.
- """
-
- def __init__(self, recipient_key):
- # type: (AnyStr) -> None
- """Create a new PK encryption object.
-
- Args:
- recipient_key(str): a public key that will be used for encryption
- """
- if not recipient_key:
- raise ValueError("Recipient key can't be empty")
-
- self._buf = ffi.new("char[]", lib.olm_pk_encryption_size())
- self._pk_encryption = lib.olm_pk_encryption(self._buf)
- track_for_finalization(self, self._pk_encryption, _clear_pk_encryption)
-
- byte_key = to_bytearray(recipient_key)
- lib.olm_pk_encryption_set_recipient_key(
- self._pk_encryption,
- ffi.from_buffer(byte_key),
- len(byte_key)
- )
-
- # clear out copies of the key
- if byte_key is not recipient_key: # pragma: no cover
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- def _check_error(self, ret): # pragma: no cover
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string(lib.olm_pk_encryption_last_error(self._pk_encryption)))
-
- raise PkEncryptionError(last_error)
-
- def encrypt(self, plaintext):
- # type: (AnyStr) -> PkMessage
- """Encrypt a message.
-
- Returns the encrypted PkMessage.
-
- Args:
- plaintext(str): A string that will be encrypted using the
- PkEncryption object.
- """
- byte_plaintext = to_bytearray(plaintext)
-
- r_length = lib.olm_pk_encrypt_random_length(self._pk_encryption)
- random = URANDOM(r_length)
- random_buffer = ffi.new("char[]", random)
-
- ciphertext_length = lib.olm_pk_ciphertext_length(
- self._pk_encryption, len(byte_plaintext)
- )
- ciphertext = ffi.new("char[]", ciphertext_length)
-
- mac_length = lib.olm_pk_mac_length(self._pk_encryption)
- mac = ffi.new("char[]", mac_length)
-
- ephemeral_key_size = lib.olm_pk_key_length()
- ephemeral_key = ffi.new("char[]", ephemeral_key_size)
-
- ret = lib.olm_pk_encrypt(
- self._pk_encryption,
- ffi.from_buffer(byte_plaintext), len(byte_plaintext),
- ciphertext, ciphertext_length,
- mac, mac_length,
- ephemeral_key, ephemeral_key_size,
- random_buffer, r_length
- )
-
- try:
- self._check_error(ret)
- finally: # pragma: no cover
- # clear out copies of plaintext
- if byte_plaintext is not plaintext:
- for i in range(0, len(byte_plaintext)):
- byte_plaintext[i] = 0
-
- message = PkMessage(
- bytes_to_native_str(
- ffi.unpack(ephemeral_key, ephemeral_key_size)),
- bytes_to_native_str(
- ffi.unpack(mac, mac_length)),
- bytes_to_native_str(
- ffi.unpack(ciphertext, ciphertext_length))
- )
- return message
-
-
-def _clear_pk_decryption(pk_struct):
- lib.olm_clear_pk_decryption(pk_struct)
-
-
-class PkDecryption(object):
- """PkDecryption class.
-
- Represents the decryption part of a PK encrypted channel.
-
- Attributes:
- public_key (str): The public key of the PkDecryption object, can be
- shared and used to create a PkEncryption object.
-
- """
-
- def __new__(cls):
- # type: (Type[PkDecryption]) -> PkDecryption
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_pk_decryption_size())
- obj._pk_decryption = lib.olm_pk_decryption(obj._buf)
- obj.public_key = None
- track_for_finalization(obj, obj._pk_decryption, _clear_pk_decryption)
- return obj
-
- def __init__(self):
- if False: # pragma: no cover
- self._pk_decryption = self._pk_decryption # type: ffi.cdata
-
- random_length = lib.olm_pk_private_key_length()
- random = URANDOM(random_length)
- random_buffer = ffi.new("char[]", random)
-
- key_length = lib.olm_pk_key_length()
- key_buffer = ffi.new("char[]", key_length)
-
- ret = lib.olm_pk_key_from_private(
- self._pk_decryption,
- key_buffer, key_length,
- random_buffer, random_length
- )
- self._check_error(ret)
- self.public_key = bytes_to_native_str(ffi.unpack(
- key_buffer,
- key_length
- ))
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string(lib.olm_pk_decryption_last_error(self._pk_decryption)))
-
- raise PkDecryptionError(last_error)
-
- def pickle(self, passphrase=""):
- # type: (str) -> bytes
- """Store a PkDecryption object.
-
- Stores a PkDecryption object as a base64 string. Encrypts the object
- using the supplied passphrase. Returns a byte object containing the
- base64 encoded string of the pickled session.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the object.
- """
- byte_key = to_bytearray(passphrase)
-
- pickle_length = lib.olm_pickle_pk_decryption_length(
- self._pk_decryption
- )
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- ret = lib.olm_pickle_pk_decryption(
- self._pk_decryption,
- ffi.from_buffer(byte_key), len(byte_key),
- pickle_buffer, pickle_length
- )
- try:
- self._check_error(ret)
- finally:
- # zero out copies of the passphrase
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # types: (bytes, str) -> PkDecryption
- """Restore a previously stored PkDecryption object.
-
- Creates a PkDecryption object from a pickled base64 string. Decrypts
- the pickled object using the supplied passphrase.
- Raises PkDecryptionError on failure. If the passphrase
- doesn't match the one used to encrypt the session then the error
- message for the exception will be "BAD_ACCOUNT_KEY". If the base64
- couldn't be decoded then the error message will be "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- PkDecryption object
- passphrase(str, optional): The passphrase used to encrypt the
- object
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_key = to_bytearray(passphrase)
- pickle_buffer = ffi.new("char[]", pickle)
-
- pubkey_length = lib.olm_pk_key_length()
- pubkey_buffer = ffi.new("char[]", pubkey_length)
-
- obj = cls.__new__(cls)
-
- ret = lib.olm_unpickle_pk_decryption(
- obj._pk_decryption,
- ffi.from_buffer(byte_key), len(byte_key),
- pickle_buffer, len(pickle),
- pubkey_buffer, pubkey_length)
-
- try:
- obj._check_error(ret)
- finally:
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- obj.public_key = bytes_to_native_str(ffi.unpack(
- pubkey_buffer,
- pubkey_length
- ))
-
- return obj
-
- def decrypt(self, message, unicode_errors="replace"):
- # type (PkMessage, str) -> str
- """Decrypt a previously encrypted Pk message.
-
- Returns the decrypted plaintext.
- Raises PkDecryptionError on failure.
-
- Args:
- message(PkMessage): the pk message to decrypt.
- unicode_errors(str, optional): The error handling scheme to use for
- unicode decoding errors. The default is "replace" meaning that
- the character that was unable to decode will be replaced with
- the unicode replacement character (U+FFFD). Other possible
- values are "strict", "ignore" and "xmlcharrefreplace" as well
- as any other name registered with codecs.register_error that
- can handle UnicodeEncodeErrors.
- """
- ephemeral_key = to_bytearray(message.ephemeral_key)
- ephemeral_key_size = len(ephemeral_key)
-
- mac = to_bytearray(message.mac)
- mac_length = len(mac)
-
- ciphertext = to_bytearray(message.ciphertext)
- ciphertext_length = len(ciphertext)
-
- max_plaintext_length = lib.olm_pk_max_plaintext_length(
- self._pk_decryption,
- ciphertext_length
- )
- plaintext_buffer = ffi.new("char[]", max_plaintext_length)
-
- ret = lib.olm_pk_decrypt(
- self._pk_decryption,
- ffi.from_buffer(ephemeral_key), ephemeral_key_size,
- ffi.from_buffer(mac), mac_length,
- ffi.from_buffer(ciphertext), ciphertext_length,
- plaintext_buffer, max_plaintext_length)
- self._check_error(ret)
-
- plaintext = (ffi.unpack(
- plaintext_buffer,
- ret
- ))
-
- # clear out copies of the plaintext
- lib.memset(plaintext_buffer, 0, max_plaintext_length)
-
- return to_unicode_str(plaintext, errors=unicode_errors)
-
-
-def _clear_pk_signing(pk_struct):
- lib.olm_clear_pk_signing(pk_struct)
-
-
-class PkSigning(object):
- """PkSigning class.
-
- Signs messages using public key cryptography.
-
- Attributes:
- public_key (str): The public key of the PkSigning object, can be
- shared and used to verify using Utility.ed25519_verify.
-
- """
-
- def __init__(self, seed):
- # type: (bytes) -> None
- """Create a new signing object.
-
- Args:
- seed(bytes): the seed to use as the private key for signing. The
- seed must have the same length as the seeds generated by
- PkSigning.generate_seed().
- """
- if not seed:
- raise ValueError("seed can't be empty")
-
- self._buf = ffi.new("char[]", lib.olm_pk_signing_size())
- self._pk_signing = lib.olm_pk_signing(self._buf)
- track_for_finalization(self, self._pk_signing, _clear_pk_signing)
-
- seed_buffer = ffi.new("char[]", seed)
-
- pubkey_length = lib.olm_pk_signing_public_key_length()
- pubkey_buffer = ffi.new("char[]", pubkey_length)
-
- ret = lib.olm_pk_signing_key_from_seed(
- self._pk_signing,
- pubkey_buffer, pubkey_length,
- seed_buffer, len(seed)
- )
-
- # zero out copies of the seed
- lib.memset(seed_buffer, 0, len(seed))
-
- self._check_error(ret)
-
- self.public_key = bytes_to_native_str(
- ffi.unpack(pubkey_buffer, pubkey_length)
- )
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string(lib.olm_pk_signing_last_error(self._pk_signing)))
-
- raise PkSigningError(last_error)
-
- @classmethod
- def generate_seed(cls):
- # type: () -> bytes
- """Generate a random seed.
- """
- random_length = lib.olm_pk_signing_seed_length()
- random = URANDOM(random_length)
-
- return random
-
- def sign(self, message):
- # type: (AnyStr) -> str
- """Sign a message
-
- Returns the signature.
- Raises PkSigningError on failure.
-
- Args:
- message(str): the message to sign.
- """
- bytes_message = to_bytearray(message)
-
- signature_length = lib.olm_pk_signature_length()
- signature_buffer = ffi.new("char[]", signature_length)
-
- ret = lib.olm_pk_sign(
- self._pk_signing,
- ffi.from_buffer(bytes_message), len(bytes_message),
- signature_buffer, signature_length)
- self._check_error(ret)
-
- return bytes_to_native_str(
- ffi.unpack(signature_buffer, signature_length)
- )
diff --git a/python/olm/sas.py b/python/olm/sas.py
deleted file mode 100644
index cf2a443..0000000
--- a/python/olm/sas.py
+++ /dev/null
@@ -1,245 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2019 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""libolm SAS module.
-
-This module contains functions to perform key verification using the Short
-Authentication String (SAS) method.
-
-Examples:
- >>> sas = Sas()
- >>> bob_key = "3p7bfXt9wbTTW2HC7OQ1Nz+DQ8hbeGdNrfx+FG+IK08"
- >>> message = "Hello world!"
- >>> extra_info = "MAC"
- >>> sas_alice.set_their_pubkey(bob_key)
- >>> sas_alice.calculate_mac(message, extra_info)
- >>> sas_alice.generate_bytes(extra_info, 5)
-
-"""
-
-from builtins import bytes
-from functools import wraps
-from typing import Optional
-
-from future.utils import bytes_to_native_str
-
-from _libolm import ffi, lib
-
-from ._compat import URANDOM, to_bytearray, to_bytes
-from ._finalize import track_for_finalization
-
-
-def _clear_sas(sas):
- # type: (ffi.cdata) -> None
- lib.olm_clear_sas(sas)
-
-
-class OlmSasError(Exception):
- """libolm Sas error exception."""
-
-
-class Sas(object):
- """libolm Short Authenticaton String (SAS) class."""
-
- def __init__(self, other_users_pubkey=None):
- # type: (Optional[str]) -> None
- """Create a new SAS object.
-
- Args:
- other_users_pubkey(str, optional): The other users public key, this
- key is necesary to generate bytes for the authentication string
- as well as to calculate the MAC.
-
- Raises OlmSasError on failure.
-
- """
- self._buf = ffi.new("char[]", lib.olm_sas_size())
- self._sas = lib.olm_sas(self._buf)
- track_for_finalization(self, self._sas, _clear_sas)
-
- random_length = lib.olm_create_sas_random_length(self._sas)
- random = URANDOM(random_length)
-
- self._create_sas(random, random_length)
-
- if other_users_pubkey:
- self.set_their_pubkey(other_users_pubkey)
-
- def _create_sas(self, buffer, buffer_length):
- self._check_error(
- lib.olm_create_sas(
- self._sas,
- ffi.from_buffer(buffer),
- buffer_length
- )
- )
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string((lib.olm_sas_last_error(self._sas))))
-
- raise OlmSasError(last_error)
-
- @property
- def pubkey(self):
- # type: () -> str
- """Get the public key for the SAS object.
-
- This returns the public key of the SAS object that can then be shared
- with another user to perform the authentication process.
-
- Raises OlmSasError on failure.
-
- """
- pubkey_length = lib.olm_sas_pubkey_length(self._sas)
- pubkey_buffer = ffi.new("char[]", pubkey_length)
-
- self._check_error(
- lib.olm_sas_get_pubkey(self._sas, pubkey_buffer, pubkey_length)
- )
-
- return bytes_to_native_str(ffi.unpack(pubkey_buffer, pubkey_length))
-
- @property
- def other_key_set(self):
- # type: () -> bool
- """Check if the other user's pubkey has been set.
- """
- return lib.olm_sas_is_their_key_set(self._sas) == 1
-
- def set_their_pubkey(self, key):
- # type: (str) -> None
- """Set the public key of the other user.
-
- This sets the public key of the other user, it needs to be set before
- bytes can be generated for the authentication string and a MAC can be
- calculated.
-
- Args:
- key (str): The other users public key.
-
- Raises OlmSasError on failure.
-
- """
- byte_key = to_bytearray(key)
-
- self._check_error(
- lib.olm_sas_set_their_key(
- self._sas,
- ffi.from_buffer(byte_key),
- len(byte_key)
- )
- )
-
- def generate_bytes(self, extra_info, length):
- # type: (str, int) -> bytes
- """Generate bytes to use for the short authentication string.
-
- Args:
- extra_info (str): Extra information to mix in when generating the
- bytes.
- length (int): The number of bytes to generate.
-
- Raises OlmSasError if the other users persons public key isn't set or
- an internal Olm error happens.
-
- """
- if length < 1:
- raise ValueError("The length needs to be a positive integer value")
-
- byte_info = to_bytearray(extra_info)
- out_buffer = ffi.new("char[]", length)
-
- self._check_error(
- lib.olm_sas_generate_bytes(
- self._sas,
- ffi.from_buffer(byte_info),
- len(byte_info),
- out_buffer,
- length
- )
- )
-
- return ffi.unpack(out_buffer, length)
-
- def calculate_mac(self, message, extra_info):
- # type: (str, str) -> str
- """Generate a message authentication code based on the shared secret.
-
- Args:
- message (str): The message to produce the authentication code for.
- extra_info (str): Extra information to mix in when generating the
- MAC
-
- Raises OlmSasError on failure.
-
- """
- byte_message = to_bytes(message)
- byte_info = to_bytes(extra_info)
-
- mac_length = lib.olm_sas_mac_length(self._sas)
- mac_buffer = ffi.new("char[]", mac_length)
-
- self._check_error(
- lib.olm_sas_calculate_mac(
- self._sas,
- ffi.from_buffer(byte_message),
- len(byte_message),
- ffi.from_buffer(byte_info),
- len(byte_info),
- mac_buffer,
- mac_length
- )
- )
- return bytes_to_native_str(ffi.unpack(mac_buffer, mac_length))
-
- def calculate_mac_long_kdf(self, message, extra_info):
- # type: (str, str) -> str
- """Generate a message authentication code based on the shared secret.
-
- This function should not be used unless compatibility with an older
- non-tagged Olm version is required.
-
- Args:
- message (str): The message to produce the authentication code for.
- extra_info (str): Extra information to mix in when generating the
- MAC
-
- Raises OlmSasError on failure.
-
- """
- byte_message = to_bytes(message)
- byte_info = to_bytes(extra_info)
-
- mac_length = lib.olm_sas_mac_length(self._sas)
- mac_buffer = ffi.new("char[]", mac_length)
-
- self._check_error(
- lib.olm_sas_calculate_mac_long_kdf(
- self._sas,
- ffi.from_buffer(byte_message),
- len(byte_message),
- ffi.from_buffer(byte_info),
- len(byte_info),
- mac_buffer,
- mac_length
- )
- )
- return bytes_to_native_str(ffi.unpack(mac_buffer, mac_length))
diff --git a/python/olm/session.py b/python/olm/session.py
deleted file mode 100644
index 636eb3d..0000000
--- a/python/olm/session.py
+++ /dev/null
@@ -1,495 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm Session module.
-
-This module contains the Olm Session part of the Olm library.
-
-It is used to establish a peer-to-peer encrypted communication channel between
-two Olm accounts.
-
-Examples:
- >>> alice = Account()
- >>> bob = Account()
- >>> bob.generate_one_time_keys(1)
- >>> id_key = bob.identity_keys['curve25519']
- >>> one_time = list(bob.one_time_keys["curve25519"].values())[0]
- >>> session = OutboundSession(alice, id_key, one_time)
-
-"""
-
-# pylint: disable=redefined-builtin,unused-import
-from builtins import bytes, super
-from typing import AnyStr, Optional, Type
-
-from future.utils import bytes_to_native_str
-
-# pylint: disable=no-name-in-module
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import URANDOM, to_bytearray, to_bytes, to_unicode_str
-from ._finalize import track_for_finalization
-
-# This is imported only for type checking purposes
-if False:
- from .account import Account # pragma: no cover
-
-
-class OlmSessionError(Exception):
- """libolm Session exception."""
-
-
-class _OlmMessage(object):
- def __init__(self, ciphertext, message_type):
- # type: (AnyStr, ffi.cdata) -> None
- if not ciphertext:
- raise ValueError("Ciphertext can't be empty")
-
- # I don't know why mypy wants a type annotation here nor why AnyStr
- # doesn't work
- self.ciphertext = ciphertext # type: ignore
- self.message_type = message_type
-
- def __str__(self):
- # type: () -> str
- type_to_prefix = {
- lib.OLM_MESSAGE_TYPE_PRE_KEY: "PRE_KEY",
- lib.OLM_MESSAGE_TYPE_MESSAGE: "MESSAGE"
- }
-
- prefix = type_to_prefix[self.message_type]
- return "{} {}".format(prefix, self.ciphertext)
-
-
-class OlmPreKeyMessage(_OlmMessage):
- """Olm prekey message class
-
- Prekey messages are used to establish an Olm session. After the first
- message exchange the session switches to normal messages
- """
-
- def __init__(self, ciphertext):
- # type: (AnyStr) -> None
- """Create a new Olm prekey message with the supplied ciphertext
-
- Args:
- ciphertext(str): The ciphertext of the prekey message.
- """
- _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_PRE_KEY)
-
- def __repr__(self):
- # type: () -> str
- return "OlmPreKeyMessage({})".format(self.ciphertext)
-
-
-class OlmMessage(_OlmMessage):
- """Olm message class"""
-
- def __init__(self, ciphertext):
- # type: (AnyStr) -> None
- """Create a new Olm message with the supplied ciphertext
-
- Args:
- ciphertext(str): The ciphertext of the message.
- """
- _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_MESSAGE)
-
- def __repr__(self):
- # type: () -> str
- return "OlmMessage({})".format(self.ciphertext)
-
-
-def _clear_session(session):
- # type: (ffi.cdata) -> None
- lib.olm_clear_session(session)
-
-
-class Session(object):
- """libolm Session class.
- This is an abstract class that can't be instantiated except when unpickling
- a previously pickled InboundSession or OutboundSession object with
- from_pickle.
- """
-
- def __new__(cls):
- # type: (Type[Session]) -> Session
-
- obj = super().__new__(cls)
- obj._buf = ffi.new("char[]", lib.olm_session_size())
- obj._session = lib.olm_session(obj._buf)
- track_for_finalization(obj, obj._session, _clear_session)
- return obj
-
- def __init__(self):
- # type: () -> None
- if type(self) is Session:
- raise TypeError("Session class may not be instantiated.")
-
- if False:
- self._session = self._session # type: ffi.cdata
-
- def _check_error(self, ret):
- # type: (int) -> None
- if ret != lib.olm_error():
- return
-
- last_error = bytes_to_native_str(
- ffi.string(lib.olm_session_last_error(self._session)))
-
- raise OlmSessionError(last_error)
-
- def pickle(self, passphrase=""):
- # type: (Optional[str]) -> bytes
- """Store an Olm session.
-
- Stores a session as a base64 string. Encrypts the session using the
- supplied passphrase. Returns a byte object containing the base64
- encoded string of the pickled session. Raises OlmSessionError on
- failure.
-
- Args:
- passphrase(str, optional): The passphrase to be used to encrypt
- the session.
- """
- byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
-
- pickle_length = lib.olm_pickle_session_length(self._session)
- pickle_buffer = ffi.new("char[]", pickle_length)
-
- try:
- self._check_error(
- lib.olm_pickle_session(self._session,
- ffi.from_buffer(byte_key),
- len(byte_key),
- pickle_buffer, pickle_length))
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return ffi.unpack(pickle_buffer, pickle_length)
-
- @classmethod
- def from_pickle(cls, pickle, passphrase=""):
- # type: (bytes, Optional[str]) -> Session
- """Load a previously stored Olm session.
-
- Loads a session from a pickled base64 string and returns a Session
- object. Decrypts the session using the supplied passphrase. Raises
- OlmSessionError on failure. If the passphrase doesn't match the one
- used to encrypt the session then the error message for the
- exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
- then the error message will be "INVALID_BASE64".
-
- Args:
- pickle(bytes): Base64 encoded byte string containing the pickled
- session
- passphrase(str, optional): The passphrase used to encrypt the
- session.
- """
- if not pickle:
- raise ValueError("Pickle can't be empty")
-
- byte_key = bytearray(passphrase, "utf-8") if passphrase else b""
- # copy because unpickle will destroy the buffer
- pickle_buffer = ffi.new("char[]", pickle)
-
- session = cls.__new__(cls)
-
- try:
- ret = lib.olm_unpickle_session(session._session,
- ffi.from_buffer(byte_key),
- len(byte_key),
- pickle_buffer,
- len(pickle))
- session._check_error(ret)
- finally:
- # clear out copies of the passphrase
- for i in range(0, len(byte_key)):
- byte_key[i] = 0
-
- return session
-
- def encrypt(self, plaintext):
- # type: (AnyStr) -> _OlmMessage
- """Encrypts a message using the session. Returns the ciphertext as a
- base64 encoded string on success. Raises OlmSessionError on failure.
-
- Args:
- plaintext(str): The plaintext message that will be encrypted.
- """
- byte_plaintext = to_bytearray(plaintext)
-
- r_length = lib.olm_encrypt_random_length(self._session)
- random = URANDOM(r_length)
-
- try:
- message_type = lib.olm_encrypt_message_type(self._session)
-
- self._check_error(message_type)
-
- ciphertext_length = lib.olm_encrypt_message_length(
- self._session, len(byte_plaintext)
- )
- ciphertext_buffer = ffi.new("char[]", ciphertext_length)
-
- self._check_error(lib.olm_encrypt(
- self._session,
- ffi.from_buffer(byte_plaintext), len(byte_plaintext),
- ffi.from_buffer(random), r_length,
- ciphertext_buffer, ciphertext_length,
- ))
- finally:
- # clear out copies of plaintext
- if byte_plaintext is not plaintext:
- for i in range(0, len(byte_plaintext)):
- byte_plaintext[i] = 0
-
- if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY:
- return OlmPreKeyMessage(
- bytes_to_native_str(ffi.unpack(
- ciphertext_buffer,
- ciphertext_length
- )))
- elif message_type == lib.OLM_MESSAGE_TYPE_MESSAGE:
- return OlmMessage(
- bytes_to_native_str(ffi.unpack(
- ciphertext_buffer,
- ciphertext_length
- )))
- else: # pragma: no cover
- raise ValueError("Unknown message type")
-
- def decrypt(self, message, unicode_errors="replace"):
- # type: (_OlmMessage, str) -> str
- """Decrypts a message using the session. Returns the plaintext string
- on success. Raises OlmSessionError on failure. If the base64 couldn't
- be decoded then the error message will be "INVALID_BASE64". If the
- message is for an unsupported version of the protocol the error message
- will be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then
- the error message will be "BAD_MESSAGE_FORMAT". If the MAC on the
- message was invalid then the error message will be "BAD_MESSAGE_MAC".
-
- Args:
- message(OlmMessage): The Olm message that will be decrypted. It can
- be either an OlmPreKeyMessage or an OlmMessage.
- unicode_errors(str, optional): The error handling scheme to use for
- unicode decoding errors. The default is "replace" meaning that
- the character that was unable to decode will be replaced with
- the unicode replacement character (U+FFFD). Other possible
- values are "strict", "ignore" and "xmlcharrefreplace" as well
- as any other name registered with codecs.register_error that
- can handle UnicodeEncodeErrors.
- """
- if not message.ciphertext:
- raise ValueError("Ciphertext can't be empty")
-
- byte_ciphertext = to_bytes(message.ciphertext)
- # make a copy the ciphertext buffer, because
- # olm_decrypt_max_plaintext_length wants to destroy something
- ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
-
- max_plaintext_length = lib.olm_decrypt_max_plaintext_length(
- self._session, message.message_type, ciphertext_buffer,
- len(byte_ciphertext)
- )
- self._check_error(max_plaintext_length)
- plaintext_buffer = ffi.new("char[]", max_plaintext_length)
-
- # make a copy the ciphertext buffer, because
- # olm_decrypt_max_plaintext_length wants to destroy something
- ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
- plaintext_length = lib.olm_decrypt(
- self._session, message.message_type,
- ciphertext_buffer, len(byte_ciphertext),
- plaintext_buffer, max_plaintext_length
- )
- self._check_error(plaintext_length)
- plaintext = to_unicode_str(
- ffi.unpack(plaintext_buffer, plaintext_length),
- errors=unicode_errors
- )
-
- # clear out copies of the plaintext
- lib.memset(plaintext_buffer, 0, max_plaintext_length)
-
- return plaintext
-
- @property
- def id(self):
- # type: () -> str
- """str: An identifier for this session. Will be the same for both
- ends of the conversation.
- """
- id_length = lib.olm_session_id_length(self._session)
- id_buffer = ffi.new("char[]", id_length)
-
- self._check_error(
- lib.olm_session_id(self._session, id_buffer, id_length)
- )
- return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
-
- def matches(self, message, identity_key=None):
- # type: (OlmPreKeyMessage, Optional[AnyStr]) -> bool
- """Checks if the PRE_KEY message is for this in-bound session.
- This can happen if multiple messages are sent to this session before
- this session sends a message in reply. Returns True if the session
- matches. Returns False if the session does not match. Raises
- OlmSessionError on failure. If the base64 couldn't be decoded then the
- error message will be "INVALID_BASE64". If the message was for an
- unsupported protocol version then the error message will be
- "BAD_MESSAGE_VERSION". If the message couldn't be decoded then then the
- error message will be * "BAD_MESSAGE_FORMAT".
-
- Args:
- message(OlmPreKeyMessage): The Olm prekey message that will checked
- if it is intended for this session.
- identity_key(str, optional): The identity key of the sender. To
- check if the message was also sent using this identity key.
- """
- if not isinstance(message, OlmPreKeyMessage):
- raise TypeError("Matches can only be called with prekey messages.")
-
- if not message.ciphertext:
- raise ValueError("Ciphertext can't be empty")
-
- ret = None
-
- byte_ciphertext = to_bytes(message.ciphertext)
- # make a copy, because olm_matches_inbound_session(_from) will distroy
- # it
- message_buffer = ffi.new("char[]", byte_ciphertext)
-
- if identity_key:
- byte_id_key = to_bytes(identity_key)
-
- ret = lib.olm_matches_inbound_session_from(
- self._session,
- ffi.from_buffer(byte_id_key), len(byte_id_key),
- message_buffer, len(byte_ciphertext)
- )
-
- else:
- ret = lib.olm_matches_inbound_session(
- self._session,
- message_buffer, len(byte_ciphertext))
-
- self._check_error(ret)
-
- return bool(ret)
-
-
-class InboundSession(Session):
- """Inbound Olm session for p2p encrypted communication.
- """
-
- def __new__(cls, account, message, identity_key=None):
- # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> Session
- return super().__new__(cls)
-
- def __init__(self, account, message, identity_key=None):
- # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> None
- """Create a new inbound Olm session.
-
- Create a new in-bound session for sending/receiving messages from an
- incoming prekey message. Raises OlmSessionError on failure. If the
- base64 couldn't be decoded then error message will be "INVALID_BASE64".
- If the message was for an unsupported protocol version then
- the errror message will be "BAD_MESSAGE_VERSION". If the message
- couldn't be decoded then then the error message will be
- "BAD_MESSAGE_FORMAT". If the message refers to an unknown one-time
- key then the error message will be "BAD_MESSAGE_KEY_ID".
-
- Args:
- account(Account): The Olm Account that will be used to create this
- session.
- message(OlmPreKeyMessage): The Olm prekey message that will checked
- that will be used to create this session.
- identity_key(str, optional): The identity key of the sender. To
- check if the message was also sent using this identity key.
- """
- if not message.ciphertext:
- raise ValueError("Ciphertext can't be empty")
-
- super().__init__()
- byte_ciphertext = to_bytes(message.ciphertext)
- message_buffer = ffi.new("char[]", byte_ciphertext)
-
- if identity_key:
- byte_id_key = to_bytes(identity_key)
- identity_key_buffer = ffi.new("char[]", byte_id_key)
- self._check_error(lib.olm_create_inbound_session_from(
- self._session,
- account._account,
- identity_key_buffer, len(byte_id_key),
- message_buffer, len(byte_ciphertext)
- ))
- else:
- self._check_error(lib.olm_create_inbound_session(
- self._session,
- account._account,
- message_buffer, len(byte_ciphertext)
- ))
-
-
-class OutboundSession(Session):
- """Outbound Olm session for p2p encrypted communication."""
-
- def __new__(cls, account, identity_key, one_time_key):
- # type: (Account, AnyStr, AnyStr) -> Session
- return super().__new__(cls)
-
- def __init__(self, account, identity_key, one_time_key):
- # type: (Account, AnyStr, AnyStr) -> None
- """Create a new outbound Olm session.
-
- Creates a new outbound session for sending messages to a given
- identity key and one-time key.
-
- Raises OlmSessionError on failure. If the keys couldn't be decoded as
- base64 then the error message will be "INVALID_BASE64".
-
- Args:
- account(Account): The Olm Account that will be used to create this
- session.
- identity_key(str): The identity key of the person with whom we want
- to start the session.
- one_time_key(str): A one-time key from the person with whom we want
- to start the session.
- """
- if not identity_key:
- raise ValueError("Identity key can't be empty")
-
- if not one_time_key:
- raise ValueError("One-time key can't be empty")
-
- super().__init__()
-
- byte_id_key = to_bytes(identity_key)
- byte_one_time = to_bytes(one_time_key)
-
- session_random_length = lib.olm_create_outbound_session_random_length(
- self._session)
-
- random = URANDOM(session_random_length)
-
- self._check_error(lib.olm_create_outbound_session(
- self._session,
- account._account,
- ffi.from_buffer(byte_id_key), len(byte_id_key),
- ffi.from_buffer(byte_one_time), len(byte_one_time),
- ffi.from_buffer(random), session_random_length
- ))
diff --git a/python/olm/utility.py b/python/olm/utility.py
deleted file mode 100644
index bddef38..0000000
--- a/python/olm/utility.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# -*- coding: utf-8 -*-
-# libolm python bindings
-# Copyright © 2015-2017 OpenMarket Ltd
-# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""libolm Utility module.
-
-This module contains utilities for olm.
-It only contains the ed25519_verify function for signature verification.
-
-Examples:
- >>> alice = Account()
-
- >>> message = "Test"
- >>> signature = alice.sign(message)
- >>> signing_key = alice.identity_keys["ed25519"]
-
- >>> ed25519_verify(signing_key, message, signature)
-
-"""
-
-# pylint: disable=redefined-builtin,unused-import
-from typing import AnyStr, Type
-
-from future.utils import bytes_to_native_str
-
-# pylint: disable=no-name-in-module
-from _libolm import ffi, lib # type: ignore
-
-from ._compat import to_bytearray, to_bytes
-from ._finalize import track_for_finalization
-
-
-def _clear_utility(utility): # pragma: no cover
- # type: (ffi.cdata) -> None
- lib.olm_clear_utility(utility)
-
-
-class OlmVerifyError(Exception):
- """libolm signature verification exception."""
-
-
-class OlmHashError(Exception):
- """libolm hash calculation exception."""
-
-
-class _Utility(object):
- # pylint: disable=too-few-public-methods
- """libolm Utility class."""
-
- _buf = None
- _utility = None
-
- @classmethod
- def _allocate(cls):
- # type: (Type[_Utility]) -> None
- cls._buf = ffi.new("char[]", lib.olm_utility_size())
- cls._utility = lib.olm_utility(cls._buf)
- track_for_finalization(cls, cls._utility, _clear_utility)
-
- @classmethod
- def _check_error(cls, ret, error_class):
- # type: (int, Type) -> None
- if ret != lib.olm_error():
- return
-
- raise error_class("{}".format(
- ffi.string(lib.olm_utility_last_error(
- cls._utility)).decode("utf-8")))
-
- @classmethod
- def _ed25519_verify(cls, key, message, signature):
- # type: (Type[_Utility], AnyStr, AnyStr, AnyStr) -> None
- if not cls._utility:
- cls._allocate()
-
- byte_key = to_bytes(key)
- byte_message = to_bytearray(message)
- byte_signature = to_bytearray(signature)
-
- try:
- ret = lib.olm_ed25519_verify(
- cls._utility,
- byte_key,
- len(byte_key),
- ffi.from_buffer(byte_message),
- len(byte_message),
- ffi.from_buffer(byte_signature),
- len(byte_signature)
- )
-
- cls._check_error(ret, OlmVerifyError)
-
- finally:
- # clear out copies of the message, which may be a plaintext
- if byte_message is not message:
- for i in range(0, len(byte_message)):
- byte_message[i] = 0
-
- @classmethod
- def _sha256(cls, input):
- # type: (Type[_Utility], AnyStr) -> str
- if not cls._utility:
- cls._allocate()
-
- byte_input = to_bytes(input)
- hash_length = lib.olm_sha256_length(cls._utility)
- hash = ffi.new("char[]", hash_length)
-
- ret = lib.olm_sha256(cls._utility, byte_input, len(byte_input),
- hash, hash_length)
-
- cls._check_error(ret, OlmHashError)
-
- return bytes_to_native_str(ffi.unpack(hash, hash_length))
-
-
-def ed25519_verify(key, message, signature):
- # type: (AnyStr, AnyStr, AnyStr) -> None
- """Verify an ed25519 signature.
-
- Raises an OlmVerifyError if verification fails.
-
- Args:
- key(str): The ed25519 public key used for signing.
- message(str): The signed message.
- signature(bytes): The message signature.
- """
- return _Utility._ed25519_verify(key, message, signature)
-
-
-def sha256(input_string):
- # type: (AnyStr) -> str
- """Calculate the SHA-256 hash of the input and encodes it as base64.
-
- Args:
- input_string(str): The input for which the hash will be calculated.
-
- """
- return _Utility._sha256(input_string)