aboutsummaryrefslogtreecommitdiff
path: root/python/olm
diff options
context:
space:
mode:
Diffstat (limited to 'python/olm')
-rw-r--r--python/olm/__init__.py26
-rw-r--r--python/olm/__version__.py9
-rw-r--r--python/olm/_compat.py24
-rw-r--r--python/olm/_finalize.py65
-rw-r--r--python/olm/account.py239
-rw-r--r--python/olm/group_session.py467
-rw-r--r--python/olm/session.py452
-rw-r--r--python/olm/utility.py91
8 files changed, 1373 insertions, 0 deletions
diff --git a/python/olm/__init__.py b/python/olm/__init__.py
new file mode 100644
index 0000000..b52c88b
--- /dev/null
+++ b/python/olm/__init__.py
@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""
+Olm Python bindings
+~~~~~~~~~~~~~~~~~~~~~
+| This package implements python bindings for the libolm C library.
+| © Copyright 2015-2017 by OpenMarket Ltd
+| © Copyright 2018 by Damir Jelić
+"""
+from .utility import ed25519_verify, OlmVerifyError
+from .account import Account, OlmAccountError
+from .session import (
+ Session,
+ InboundSession,
+ OutboundSession,
+ OlmSessionError,
+ OlmMessage,
+ OlmPreKeyMessage
+)
+from .group_session import (
+ InboundGroupSession,
+ OutboundGroupSession,
+ OlmGroupSessionError
+)
diff --git a/python/olm/__version__.py b/python/olm/__version__.py
new file mode 100644
index 0000000..dccfdd0
--- /dev/null
+++ b/python/olm/__version__.py
@@ -0,0 +1,9 @@
+__title__ = "python-olm"
+__description__ = ("python CFFI bindings for the olm "
+ "cryptographic ratchet library")
+__url__ = "https://github.com/poljar/python-olm"
+__version__ = "0.1"
+__author__ = "Damir Jelić"
+__author_email__ = "poljar@termina.org.uk"
+__license__ = "Apache 2.0"
+__copyright__ = "Copyright 2018 Damir Jelić"
diff --git a/python/olm/_compat.py b/python/olm/_compat.py
new file mode 100644
index 0000000..67d312b
--- /dev/null
+++ b/python/olm/_compat.py
@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+
+from builtins import bytes, str
+from typing import AnyStr
+
+try:
+ import secrets
+ URANDOM = secrets.token_bytes # pragma: no cover
+except ImportError: # pragma: no cover
+ from os import urandom
+ URANDOM = urandom # type: ignore
+
+
+def to_bytes(string):
+ # type: (AnyStr) -> bytes
+ if isinstance(string, bytes):
+ return string
+ elif isinstance(string, str):
+ return bytes(string, "utf-8")
+
+ raise TypeError("Invalid type {}".format(type(string)))
diff --git a/python/olm/_finalize.py b/python/olm/_finalize.py
new file mode 100644
index 0000000..9f467bc
--- /dev/null
+++ b/python/olm/_finalize.py
@@ -0,0 +1,65 @@
+# The MIT License (MIT)
+# Copyright (c) 2010 Benjamin Peterson <benjamin@python.org>
+
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
+# OR OTHER DEALINGS IN THE SOFTWARE.
+
+"""Finalization with weakrefs
+
+This is designed for avoiding __del__.
+"""
+from __future__ import print_function
+
+import sys
+import traceback
+import weakref
+
+__author__ = "Benjamin Peterson <benjamin@python.org>"
+
+
+class OwnerRef(weakref.ref):
+ """A simple weakref.ref subclass, so attributes can be added."""
+ pass
+
+
+def _run_finalizer(ref):
+ """Internal weakref callback to run finalizers"""
+ del _finalize_refs[id(ref)]
+ finalizer = ref.finalizer
+ item = ref.item
+ try:
+ finalizer(item)
+ except Exception: # pragma: no cover
+ print("Exception running {}:".format(finalizer), file=sys.stderr)
+ traceback.print_exc()
+
+
+_finalize_refs = {}
+
+
+def track_for_finalization(owner, item, finalizer):
+ """Register an object for finalization.
+
+ ``owner`` is the the object which is responsible for ``item``.
+ ``finalizer`` will be called with ``item`` as its only argument when
+ ``owner`` is destroyed by the garbage collector.
+ """
+ ref = OwnerRef(owner, _run_finalizer)
+ ref.item = item
+ ref.finalizer = finalizer
+ _finalize_refs[id(ref)] = ref
diff --git a/python/olm/account.py b/python/olm/account.py
new file mode 100644
index 0000000..5705fe3
--- /dev/null
+++ b/python/olm/account.py
@@ -0,0 +1,239 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""libolm Account module.
+
+This module contains the account part of the Olm library. It contains a single
+Account class which handles the creation of new accounts as well as the storing
+and restoring of them.
+
+Examples:
+ >>> acc = Account()
+ >>> account.identity_keys()
+ >>> account.generate_one_time_keys(1)
+
+"""
+
+import json
+# pylint: disable=redefined-builtin,unused-import
+from builtins import bytes, super
+from typing import AnyStr, Dict, Optional, Type
+
+from future.utils import bytes_to_native_str
+
+# pylint: disable=no-name-in-module
+from _libolm import ffi, lib # type: ignore
+
+from ._compat import URANDOM, to_bytes
+from ._finalize import track_for_finalization
+
+# This is imported only for type checking purposes
+if False:
+ from .session import Session # pragma: no cover
+
+
+def _clear_account(account):
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_account(account)
+
+
+class OlmAccountError(Exception):
+ """libolm Account error exception."""
+
+
+class Account(object):
+ """libolm Account class."""
+
+ def __new__(cls):
+ # type: (Type[Account]) -> Account
+ obj = super().__new__(cls)
+ obj._buf = ffi.new("char[]", lib.olm_account_size())
+ obj._account = lib.olm_account(obj._buf)
+ track_for_finalization(obj, obj._account, _clear_account)
+ return obj
+
+ def __init__(self):
+ # type: () -> None
+ """Create a new Olm account.
+
+ Creates a new account and its matching identity key pair.
+
+ Raises OlmAccountError on failure. If there weren't enough random bytes
+ for the account creation the error message for the exception will be
+ NOT_ENOUGH_RANDOM.
+ """
+ # This is needed to silence mypy not knowing the type of _account.
+ # There has to be a better way for this.
+ if False: # pragma: no cover
+ self._account = self._account # type: ffi.cdata
+
+ random_length = lib.olm_create_account_random_length(self._account)
+ random = URANDOM(random_length)
+ random_buffer = ffi.new("char[]", random)
+
+ self._check_error(
+ lib.olm_create_account(self._account, random_buffer,
+ random_length))
+
+ def _check_error(self, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ last_error = bytes_to_native_str(
+ ffi.string((lib.olm_account_last_error(self._account))))
+
+ raise OlmAccountError(last_error)
+
+ def pickle(self, passphrase=""):
+ # type: (Optional[str]) -> bytes
+ """Store an Olm account.
+
+ Stores an account as a base64 string. Encrypts the account using the
+ supplied passphrase. Returns a byte object containing the base64
+ encoded string of the pickled account. Raises OlmAccountError on
+ failure.
+
+ Args:
+ passphrase(str, optional): The passphrase to be used to encrypt
+ the account.
+ """
+ byte_key = bytes(passphrase, "utf-8") if passphrase else b""
+ key_buffer = ffi.new("char[]", byte_key)
+
+ pickle_length = lib.olm_pickle_account_length(self._account)
+ pickle_buffer = ffi.new("char[]", pickle_length)
+
+ self._check_error(
+ lib.olm_pickle_account(self._account, key_buffer, len(byte_key),
+ pickle_buffer, pickle_length))
+ return ffi.unpack(pickle_buffer, pickle_length)
+
+ @classmethod
+ def from_pickle(cls, pickle, passphrase=""):
+ # type: (bytes, Optional[str]) -> Account
+ """Load a previously stored olm account.
+
+ Loads an account from a pickled base64-encoded string and returns an
+ Account object. Decrypts the account using the supplied passphrase.
+ Raises OlmAccountError on failure. If the passphrase doesn't match the
+ one used to encrypt the account then the error message for the
+ exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
+ then the error message will be "INVALID_BASE64".
+
+ Args:
+ pickle(bytes): Base64 encoded byte string containing the pickled
+ account
+ passphrase(str, optional): The passphrase used to encrypt the
+ account.
+ """
+ if not pickle:
+ raise ValueError("Pickle can't be empty")
+
+ byte_key = bytes(passphrase, "utf-8") if passphrase else b""
+ key_buffer = ffi.new("char[]", byte_key)
+ pickle_buffer = ffi.new("char[]", pickle)
+
+ obj = cls.__new__(cls)
+
+ ret = lib.olm_unpickle_account(obj._account, key_buffer, len(byte_key),
+ pickle_buffer, len(pickle))
+ obj._check_error(ret)
+
+ return obj
+
+ @property
+ def identity_keys(self):
+ # type: () -> Dict[str, str]
+ """dict: Public part of the identity keys of the account."""
+ out_length = lib.olm_account_identity_keys_length(self._account)
+ out_buffer = ffi.new("char[]", out_length)
+
+ self._check_error(
+ lib.olm_account_identity_keys(self._account, out_buffer,
+ out_length))
+ return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
+
+ def sign(self, message):
+ # type: (AnyStr) -> str
+ """Signs a message with this account.
+
+ Signs a message with the private ed25519 identity key of this account.
+ Returns the signature.
+ Raises OlmAccountError on failure.
+
+ Args:
+ message(str): The message to sign.
+ """
+ bytes_message = to_bytes(message)
+ out_length = lib.olm_account_signature_length(self._account)
+ message_buffer = ffi.new("char[]", bytes_message)
+ out_buffer = ffi.new("char[]", out_length)
+
+ self._check_error(
+ lib.olm_account_sign(self._account, message_buffer,
+ len(bytes_message), out_buffer, out_length))
+
+ return bytes_to_native_str(ffi.unpack(out_buffer, out_length))
+
+ @property
+ def max_one_time_keys(self):
+ # type: () -> int
+ """int: The maximum number of one-time keys the account can store."""
+ return lib.olm_account_max_number_of_one_time_keys(self._account)
+
+ def mark_keys_as_published(self):
+ # type: () -> None
+ """Mark the current set of one-time keys as being published."""
+ lib.olm_account_mark_keys_as_published(self._account)
+
+ def generate_one_time_keys(self, count):
+ # type: (int) -> None
+ """Generate a number of new one-time keys.
+
+ If the total number of keys stored by this account exceeds
+ max_one_time_keys() then the old keys are discarded.
+ Raises OlmAccountError on error. If the number of random bytes is
+ too small then the error message of the exception will be
+ NOT_ENOUGH_RANDOM.
+
+ Args:
+ count(int): The number of keys to generate.
+ """
+ random_length = lib.olm_account_generate_one_time_keys_random_length(
+ self._account, count)
+ random = URANDOM(random_length)
+ random_buffer = ffi.new("char[]", random)
+ self._check_error(
+ lib.olm_account_generate_one_time_keys(
+ self._account, count, random_buffer, random_length))
+
+ @property
+ def one_time_keys(self):
+ # type: () -> Dict[str, Dict[str, str]]
+ """dict: The public part of the one-time keys for this account."""
+ out_length = lib.olm_account_one_time_keys_length(self._account)
+ out_buffer = ffi.new("char[]", out_length)
+
+ self._check_error(
+ lib.olm_account_one_time_keys(self._account, out_buffer,
+ out_length))
+
+ return json.loads(ffi.unpack(out_buffer, out_length).decode("utf-8"))
+
+ def remove_one_time_keys(self, session):
+ # type: (Session) -> None
+ """Remove used one-time keys.
+
+ Removes the one-time keys that the session used from the account.
+ Raises OlmAccountError on failure. If the account doesn't have any
+ matching one-time keys then the error message of the exception will be
+ "BAD_MESSAGE_KEY_ID".
+
+ Args:
+ session(Session): An Olm Session object that was created with this
+ account.
+ """
+ self._check_error(lib.olm_remove_one_time_keys(self._account,
+ session._session))
diff --git a/python/olm/group_session.py b/python/olm/group_session.py
new file mode 100644
index 0000000..6b13c60
--- /dev/null
+++ b/python/olm/group_session.py
@@ -0,0 +1,467 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""libolm Group session module.
+
+This module contains the group session part of the Olm library. It contains two
+classes for creating inbound and outbound group sessions.
+
+Examples:
+ >>> outbound = OutboundGroupSession()
+ >>> InboundGroupSession(outbound.session_key)
+"""
+
+# pylint: disable=redefined-builtin,unused-import
+from builtins import bytes, super
+from typing import AnyStr, Optional, Tuple, Type
+
+from future.utils import bytes_to_native_str
+
+# pylint: disable=no-name-in-module
+from _libolm import ffi, lib # type: ignore
+
+from ._compat import URANDOM, to_bytes
+from ._finalize import track_for_finalization
+
+
+def _clear_inbound_group_session(session):
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_inbound_group_session(session)
+
+
+def _clear_outbound_group_session(session):
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_outbound_group_session(session)
+
+
+class OlmGroupSessionError(Exception):
+ """libolm Group session error exception."""
+
+
+class InboundGroupSession(object):
+ """Inbound group session for encrypted multiuser communication."""
+
+ def __new__(
+ cls, # type: Type[InboundGroupSession]
+ session_key=None # type: Optional[str]
+ ):
+ # type: (...) -> InboundGroupSession
+ obj = super().__new__(cls)
+ obj._buf = ffi.new("char[]", lib.olm_inbound_group_session_size())
+ obj._session = lib.olm_inbound_group_session(obj._buf)
+ track_for_finalization(obj, obj._session, _clear_inbound_group_session)
+ return obj
+
+ def __init__(self, session_key):
+ # type: (AnyStr) -> None
+ """Create a new inbound group session.
+ Start a new inbound group session, from a key exported from
+ an outbound group session.
+
+ Raises OlmGroupSessionError on failure. The error message of the
+ exception will be "OLM_INVALID_BASE64" if the session key is not valid
+ base64 and "OLM_BAD_SESSION_KEY" if the session key is invalid.
+ """
+ if False: # pragma: no cover
+ self._session = self._session # type: ffi.cdata
+
+ byte_session_key = to_bytes(session_key)
+
+ key_buffer = ffi.new("char[]", byte_session_key)
+ ret = lib.olm_init_inbound_group_session(
+ self._session, key_buffer, len(byte_session_key)
+ )
+ self._check_error(ret)
+
+ def pickle(self, passphrase=""):
+ # type: (Optional[str]) -> bytes
+ """Store an inbound group session.
+
+ Stores a group session as a base64 string. Encrypts the session using
+ the supplied passphrase. Returns a byte object containing the base64
+ encoded string of the pickled session.
+
+ Args:
+ passphrase(str, optional): The passphrase to be used to encrypt
+ the session.
+ """
+ byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
+
+ passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ pickle_length = lib.olm_pickle_inbound_group_session_length(
+ self._session)
+ pickle_buffer = ffi.new("char[]", pickle_length)
+
+ ret = lib.olm_pickle_inbound_group_session(
+ self._session, passphrase_buffer, len(byte_passphrase),
+ pickle_buffer, pickle_length
+ )
+
+ self._check_error(ret)
+
+ return ffi.unpack(pickle_buffer, pickle_length)
+
+ @classmethod
+ def from_pickle(cls, pickle, passphrase=""):
+ # type: (bytes, Optional[str]) -> InboundGroupSession
+ """Load a previously stored inbound group session.
+
+ Loads an inbound group session from a pickled base64 string and returns
+ an InboundGroupSession object. Decrypts the session using the supplied
+ passphrase. Raises OlmSessionError on failure. If the passphrase
+ doesn't match the one used to encrypt the session then the error
+ message for the exception will be "BAD_ACCOUNT_KEY". If the base64
+ couldn't be decoded then the error message will be "INVALID_BASE64".
+
+ Args:
+ pickle(bytes): Base64 encoded byte string containing the pickled
+ session
+ passphrase(str, optional): The passphrase used to encrypt the
+ session
+ """
+ if not pickle:
+ raise ValueError("Pickle can't be empty")
+
+ byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
+ passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ pickle_buffer = ffi.new("char[]", pickle)
+
+ obj = cls.__new__(cls)
+
+ ret = lib.olm_unpickle_inbound_group_session(
+ obj._session,
+ passphrase_buffer,
+ len(byte_passphrase),
+ pickle_buffer,
+ len(pickle)
+ )
+ obj._check_error(ret)
+
+ return obj
+
+ def _check_error(self, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ last_error = bytes_to_native_str(ffi.string(
+ lib.olm_inbound_group_session_last_error(self._session)))
+
+ raise OlmGroupSessionError(last_error)
+
+ def decrypt(self, ciphertext):
+ # type: (AnyStr) -> Tuple[str, int]
+ """Decrypt a message
+
+ Returns a tuple of the decrypted plain-text and the message index of
+ the decrypted message or raises OlmGroupSessionError on failure.
+ On failure the error message of the exception will be:
+
+ * OLM_INVALID_BASE64 if the message is not valid base64
+ * OLM_BAD_MESSAGE_VERSION if the message was encrypted with an
+ unsupported version of the protocol
+ * OLM_BAD_MESSAGE_FORMAT if the message headers could not be
+ decoded
+ * OLM_BAD_MESSAGE_MAC if the message could not be verified
+ * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
+ corresponding to the message's index (i.e., it was sent before
+ the session key was shared with us)
+
+ Args:
+ ciphertext(str): Base64 encoded ciphertext containing the encrypted
+ message
+ """
+ if not ciphertext:
+ raise ValueError("Ciphertext can't be empty.")
+
+ byte_ciphertext = to_bytes(ciphertext)
+
+ ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
+
+ max_plaintext_length = lib.olm_group_decrypt_max_plaintext_length(
+ self._session, ciphertext_buffer, len(byte_ciphertext)
+ )
+ plaintext_buffer = ffi.new("char[]", max_plaintext_length)
+ ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
+
+ message_index = ffi.new("uint32_t*")
+ plaintext_length = lib.olm_group_decrypt(
+ self._session, ciphertext_buffer, len(byte_ciphertext),
+ plaintext_buffer, max_plaintext_length,
+ message_index
+ )
+
+ self._check_error(plaintext_length)
+
+ return bytes_to_native_str(ffi.unpack(
+ plaintext_buffer,
+ plaintext_length
+ )), message_index[0]
+
+ @property
+ def id(self):
+ # type: () -> str
+ """str: A base64 encoded identifier for this session."""
+ id_length = lib.olm_inbound_group_session_id_length(self._session)
+ id_buffer = ffi.new("char[]", id_length)
+ ret = lib.olm_inbound_group_session_id(
+ self._session,
+ id_buffer,
+ id_length
+ )
+ self._check_error(ret)
+ return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
+
+ @property
+ def first_known_index(self):
+ # type: () -> int
+ """int: The first message index we know how to decrypt."""
+ return lib.olm_inbound_group_session_first_known_index(self._session)
+
+ def export_session(self, message_index):
+ # type: (int) -> str
+ """Export an inbound group session
+
+ Export the base64-encoded ratchet key for this session, at the given
+ index, in a format which can be used by import_session().
+
+ Raises OlmGroupSessionError on failure. The error message for the
+ exception will be:
+
+ * OLM_UNKNOWN_MESSAGE_INDEX if we do not have a session key
+ corresponding to the given index (ie, it was sent before the
+ session key was shared with us)
+
+ Args:
+ message_index(int): The message index at which the session should
+ be exported.
+ """
+
+ export_length = lib.olm_export_inbound_group_session_length(
+ self._session)
+
+ export_buffer = ffi.new("char[]", export_length)
+ ret = lib.olm_export_inbound_group_session(
+ self._session,
+ export_buffer,
+ export_length,
+ message_index
+ )
+ self._check_error(ret)
+ return bytes_to_native_str(ffi.unpack(export_buffer, export_length))
+
+ @classmethod
+ def import_session(cls, session_key):
+ # type: (AnyStr) -> InboundGroupSession
+ """Create an InboundGroupSession from an exported session key.
+
+ Creates an InboundGroupSession with an previously exported session key,
+ raises OlmGroupSessionError on failure. The error message for the
+ exception will be:
+
+ * OLM_INVALID_BASE64 if the session_key is not valid base64
+ * OLM_BAD_SESSION_KEY if the session_key is invalid
+
+ Args:
+ session_key(str): The exported session key with which the inbound
+ group session will be created
+ """
+ obj = cls.__new__(cls)
+
+ byte_session_key = to_bytes(session_key)
+
+ key_buffer = ffi.new("char[]", byte_session_key)
+ ret = lib.olm_import_inbound_group_session(
+ obj._session,
+ key_buffer,
+ len(byte_session_key)
+ )
+ obj._check_error(ret)
+
+ return obj
+
+
+class OutboundGroupSession(object):
+ """Outbound group session for encrypted multiuser communication."""
+
+ def __new__(cls):
+ # type: (Type[OutboundGroupSession]) -> OutboundGroupSession
+ obj = super().__new__(cls)
+ obj._buf = ffi.new("char[]", lib.olm_outbound_group_session_size())
+ obj._session = lib.olm_outbound_group_session(obj._buf)
+ track_for_finalization(
+ obj,
+ obj._session,
+ _clear_outbound_group_session
+ )
+ return obj
+
+ def __init__(self):
+ # type: () -> None
+ """Create a new outbound group session.
+
+ Start a new outbound group session. Raises OlmGroupSessionError on
+ failure. If there weren't enough random bytes for the session creation
+ the error message for the exception will be NOT_ENOUGH_RANDOM.
+ """
+ if False: # pragma: no cover
+ self._session = self._session # type: ffi.cdata
+
+ random_length = lib.olm_init_outbound_group_session_random_length(
+ self._session
+ )
+ random = URANDOM(random_length)
+ random_buffer = ffi.new("char[]", random)
+
+ ret = lib.olm_init_outbound_group_session(
+ self._session, random_buffer, random_length
+ )
+ self._check_error(ret)
+
+ def _check_error(self, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ last_error = bytes_to_native_str(ffi.string(
+ lib.olm_outbound_group_session_last_error(self._session)
+ ))
+
+ raise OlmGroupSessionError(last_error)
+
+ def pickle(self, passphrase=""):
+ # type: (Optional[str]) -> bytes
+ """Store an outbound group session.
+
+ Stores a group session as a base64 string. Encrypts the session using
+ the supplied passphrase. Returns a byte object containing the base64
+ encoded string of the pickled session.
+
+ Args:
+ passphrase(str, optional): The passphrase to be used to encrypt
+ the session.
+ """
+ byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
+ passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ pickle_length = lib.olm_pickle_outbound_group_session_length(
+ self._session)
+ pickle_buffer = ffi.new("char[]", pickle_length)
+
+ ret = lib.olm_pickle_outbound_group_session(
+ self._session, passphrase_buffer, len(byte_passphrase),
+ pickle_buffer, pickle_length
+ )
+ self._check_error(ret)
+ return ffi.unpack(pickle_buffer, pickle_length)
+
+ @classmethod
+ def from_pickle(cls, pickle, passphrase=""):
+ # type: (bytes, Optional[str]) -> OutboundGroupSession
+ """Load a previously stored outbound group session.
+
+ Loads an outbound group session from a pickled base64 string and
+ returns an OutboundGroupSession object. Decrypts the session using the
+ supplied passphrase. Raises OlmSessionError on failure. If the
+ passphrase doesn't match the one used to encrypt the session then the
+ error message for the exception will be "BAD_ACCOUNT_KEY". If the
+ base64 couldn't be decoded then the error message will be
+ "INVALID_BASE64".
+
+ Args:
+ pickle(bytes): Base64 encoded byte string containing the pickled
+ session
+ passphrase(str, optional): The passphrase used to encrypt the
+ """
+ if not pickle:
+ raise ValueError("Pickle can't be empty")
+
+ byte_passphrase = bytes(passphrase, "utf-8") if passphrase else b""
+ passphrase_buffer = ffi.new("char[]", byte_passphrase)
+ pickle_buffer = ffi.new("char[]", pickle)
+
+ obj = cls.__new__(cls)
+
+ ret = lib.olm_unpickle_outbound_group_session(
+ obj._session,
+ passphrase_buffer,
+ len(byte_passphrase),
+ pickle_buffer,
+ len(pickle)
+ )
+ obj._check_error(ret)
+
+ return obj
+
+ def encrypt(self, plaintext):
+ # type: (AnyStr) -> str
+ """Encrypt a message.
+
+ Returns the encrypted ciphertext.
+
+ Args:
+ plaintext(str): A string that will be encrypted using the group
+ session.
+ """
+ byte_plaintext = to_bytes(plaintext)
+ message_length = lib.olm_group_encrypt_message_length(
+ self._session, len(byte_plaintext)
+ )
+
+ message_buffer = ffi.new("char[]", message_length)
+
+ plaintext_buffer = ffi.new("char[]", byte_plaintext)
+
+ ret = lib.olm_group_encrypt(
+ self._session,
+ plaintext_buffer, len(byte_plaintext),
+ message_buffer, message_length,
+ )
+ self._check_error(ret)
+ return bytes_to_native_str(ffi.unpack(message_buffer, message_length))
+
+ @property
+ def id(self):
+ # type: () -> str
+ """str: A base64 encoded identifier for this session."""
+ id_length = lib.olm_outbound_group_session_id_length(self._session)
+ id_buffer = ffi.new("char[]", id_length)
+
+ ret = lib.olm_outbound_group_session_id(
+ self._session,
+ id_buffer,
+ id_length
+ )
+ self._check_error(ret)
+
+ return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
+
+ @property
+ def message_index(self):
+ # type: () -> int
+ """int: The current message index of the session.
+
+ Each message is encrypted with an increasing index. This is the index
+ for the next message.
+ """
+ return lib.olm_outbound_group_session_message_index(self._session)
+
+ @property
+ def session_key(self):
+ # type: () -> str
+ """The base64-encoded current ratchet key for this session.
+
+ Each message is encrypted with a different ratchet key. This function
+ returns the ratchet key that will be used for the next message.
+ """
+ key_length = lib.olm_outbound_group_session_key_length(self._session)
+ key_buffer = ffi.new("char[]", key_length)
+
+ ret = lib.olm_outbound_group_session_key(
+ self._session,
+ key_buffer,
+ key_length
+ )
+ self._check_error(ret)
+
+ return bytes_to_native_str(ffi.unpack(key_buffer, key_length))
diff --git a/python/olm/session.py b/python/olm/session.py
new file mode 100644
index 0000000..b7b2c4b
--- /dev/null
+++ b/python/olm/session.py
@@ -0,0 +1,452 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""libolm Session module.
+
+This module contains the Olm Session part of the Olm library.
+
+It is used to establish a peer-to-peer encrypted communication channel between
+two Olm accounts.
+
+Examples:
+ >>> alice = Account()
+ >>> bob = Account()
+ >>> bob.generate_one_time_keys(1)
+ >>> id_key = bob.identity_keys['curve25519']
+ >>> one_time = list(bob.one_time_keys["curve25519"].values())[0]
+ >>> session = OutboundSession(alice, id_key, one_time)
+
+"""
+
+# pylint: disable=redefined-builtin,unused-import
+from builtins import bytes, super
+from typing import AnyStr, Optional, Type
+
+from future.utils import bytes_to_native_str
+
+# pylint: disable=no-name-in-module
+from _libolm import ffi, lib # type: ignore
+
+from ._compat import URANDOM, to_bytes
+from ._finalize import track_for_finalization
+
+# This is imported only for type checking purposes
+if False:
+ from .account import Account # pragma: no cover
+
+
+class OlmSessionError(Exception):
+ """libolm Session exception."""
+
+
+class _OlmMessage(object):
+ def __init__(self, ciphertext, message_type):
+ # type: (AnyStr, ffi.cdata) -> None
+ if not ciphertext:
+ raise ValueError("Ciphertext can't be empty")
+
+ # I don't know why mypy wants a type annotation here nor why AnyStr
+ # doesn't work
+ self.ciphertext = ciphertext # type: ignore
+ self.message_type = message_type
+
+ def __str__(self):
+ # type: () -> str
+ type_to_prefix = {
+ lib.OLM_MESSAGE_TYPE_PRE_KEY: "PRE_KEY",
+ lib.OLM_MESSAGE_TYPE_MESSAGE: "MESSAGE"
+ }
+
+ prefix = type_to_prefix[self.message_type]
+ return "{} {}".format(prefix, self.ciphertext)
+
+
+class OlmPreKeyMessage(_OlmMessage):
+ """Olm prekey message class
+
+ Prekey messages are used to establish an Olm session. After the first
+ message exchange the session switches to normal messages
+ """
+
+ def __init__(self, ciphertext):
+ # type: (AnyStr) -> None
+ """Create a new Olm prekey message with the supplied ciphertext
+
+ Args:
+ ciphertext(str): The ciphertext of the prekey message.
+ """
+ _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_PRE_KEY)
+
+ def __repr__(self):
+ # type: () -> str
+ return "OlmPreKeyMessage({})".format(self.ciphertext)
+
+
+class OlmMessage(_OlmMessage):
+ """Olm message class"""
+
+ def __init__(self, ciphertext):
+ # type: (AnyStr) -> None
+ """Create a new Olm message with the supplied ciphertext
+
+ Args:
+ ciphertext(str): The ciphertext of the message.
+ """
+ _OlmMessage.__init__(self, ciphertext, lib.OLM_MESSAGE_TYPE_MESSAGE)
+
+ def __repr__(self):
+ # type: () -> str
+ return "OlmMessage({})".format(self.ciphertext)
+
+
+def _clear_session(session):
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_session(session)
+
+
+class Session(object):
+ """libolm Session class.
+ This is an abstract class that can't be instantiated except when unpickling
+ a previously pickled InboundSession or OutboundSession object with
+ from_pickle.
+ """
+
+ def __new__(cls):
+ # type: (Type[Session]) -> Session
+
+ obj = super().__new__(cls)
+ obj._buf = ffi.new("char[]", lib.olm_session_size())
+ obj._session = lib.olm_session(obj._buf)
+ track_for_finalization(obj, obj._session, _clear_session)
+ return obj
+
+ def __init__(self):
+ # type: () -> None
+ if type(self) is Session:
+ raise TypeError("Session class may not be instantiated.")
+
+ if False:
+ self._session = self._session # type: ffi.cdata
+
+ def _check_error(self, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ last_error = bytes_to_native_str(
+ ffi.string(lib.olm_session_last_error(self._session)))
+
+ raise OlmSessionError(last_error)
+
+ def pickle(self, passphrase=""):
+ # type: (Optional[str]) -> bytes
+ """Store an Olm session.
+
+ Stores a session as a base64 string. Encrypts the session using the
+ supplied passphrase. Returns a byte object containing the base64
+ encoded string of the pickled session. Raises OlmSessionError on
+ failure.
+
+ Args:
+ passphrase(str, optional): The passphrase to be used to encrypt
+ the session.
+ """
+ byte_key = bytes(passphrase, "utf-8") if passphrase else b""
+ key_buffer = ffi.new("char[]", byte_key)
+
+ pickle_length = lib.olm_pickle_session_length(self._session)
+ pickle_buffer = ffi.new("char[]", pickle_length)
+
+ self._check_error(
+ lib.olm_pickle_session(self._session, key_buffer, len(byte_key),
+ pickle_buffer, pickle_length))
+ return ffi.unpack(pickle_buffer, pickle_length)
+
+ @classmethod
+ def from_pickle(cls, pickle, passphrase=""):
+ # type: (bytes, Optional[str]) -> Session
+ """Load a previously stored Olm session.
+
+ Loads a session from a pickled base64 string and returns a Session
+ object. Decrypts the session using the supplied passphrase. Raises
+ OlmSessionError on failure. If the passphrase doesn't match the one
+ used to encrypt the session then the error message for the
+ exception will be "BAD_ACCOUNT_KEY". If the base64 couldn't be decoded
+ then the error message will be "INVALID_BASE64".
+
+ Args:
+ pickle(bytes): Base64 encoded byte string containing the pickled
+ session
+ passphrase(str, optional): The passphrase used to encrypt the
+ session.
+ """
+ if not pickle:
+ raise ValueError("Pickle can't be empty")
+
+ byte_key = bytes(passphrase, "utf-8") if passphrase else b""
+ key_buffer = ffi.new("char[]", byte_key)
+ pickle_buffer = ffi.new("char[]", pickle)
+
+ session = cls.__new__(cls)
+
+ ret = lib.olm_unpickle_session(session._session, key_buffer,
+ len(byte_key), pickle_buffer,
+ len(pickle))
+ session._check_error(ret)
+
+ return session
+
+ def encrypt(self, plaintext):
+ # type: (AnyStr) -> _OlmMessage
+ """Encrypts a message using the session. Returns the ciphertext as a
+ base64 encoded string on success. Raises OlmSessionError on failure. If
+ there weren't enough random bytes to encrypt the message the error
+ message for the exception will be NOT_ENOUGH_RANDOM.
+
+ Args:
+ plaintext(str): The plaintext message that will be encrypted.
+ """
+ byte_plaintext = to_bytes(plaintext)
+
+ r_length = lib.olm_encrypt_random_length(self._session)
+ random = URANDOM(r_length)
+ random_buffer = ffi.new("char[]", random)
+
+ message_type = lib.olm_encrypt_message_type(self._session)
+
+ self._check_error(message_type)
+
+ ciphertext_length = lib.olm_encrypt_message_length(
+ self._session, len(plaintext)
+ )
+ ciphertext_buffer = ffi.new("char[]", ciphertext_length)
+
+ plaintext_buffer = ffi.new("char[]", byte_plaintext)
+
+ self._check_error(lib.olm_encrypt(
+ self._session,
+ plaintext_buffer, len(byte_plaintext),
+ random_buffer, r_length,
+ ciphertext_buffer, ciphertext_length,
+ ))
+
+ if message_type == lib.OLM_MESSAGE_TYPE_PRE_KEY:
+ return OlmPreKeyMessage(
+ bytes_to_native_str(ffi.unpack(
+ ciphertext_buffer,
+ ciphertext_length
+ )))
+ elif message_type == lib.OLM_MESSAGE_TYPE_MESSAGE:
+ return OlmMessage(
+ bytes_to_native_str(ffi.unpack(
+ ciphertext_buffer,
+ ciphertext_length
+ )))
+ else: # pragma: no cover
+ raise ValueError("Unknown message type")
+
+ def decrypt(self, message):
+ # type: (_OlmMessage) -> str
+ """Decrypts a message using the session. Returns the plaintext string
+ on success. Raises OlmSessionError on failure. If the base64 couldn't
+ be decoded then the error message will be "INVALID_BASE64". If the
+ message is for an unsupported version of the protocol the error message
+ will be "BAD_MESSAGE_VERSION". If the message couldn't be decoded then
+ the error message will be "BAD_MESSAGE_FORMAT". If the MAC on the
+ message was invalid then the error message will be "BAD_MESSAGE_MAC".
+
+ Args:
+ message(OlmMessage): The Olm message that will be decrypted. It can
+ be either an OlmPreKeyMessage or an OlmMessage.
+ """
+ if not message.ciphertext:
+ raise ValueError("Ciphertext can't be empty")
+
+ byte_ciphertext = to_bytes(message.ciphertext)
+ ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
+
+ max_plaintext_length = lib.olm_decrypt_max_plaintext_length(
+ self._session, message.message_type, ciphertext_buffer,
+ len(byte_ciphertext)
+ )
+ plaintext_buffer = ffi.new("char[]", max_plaintext_length)
+ ciphertext_buffer = ffi.new("char[]", byte_ciphertext)
+ plaintext_length = lib.olm_decrypt(
+ self._session, message.message_type, ciphertext_buffer,
+ len(byte_ciphertext), plaintext_buffer, max_plaintext_length
+ )
+ self._check_error(plaintext_length)
+ return bytes_to_native_str(
+ ffi.unpack(plaintext_buffer, plaintext_length))
+
+ @property
+ def id(self):
+ # type: () -> str
+ """str: An identifier for this session. Will be the same for both
+ ends of the conversation.
+ """
+ id_length = lib.olm_session_id_length(self._session)
+ id_buffer = ffi.new("char[]", id_length)
+
+ self._check_error(
+ lib.olm_session_id(self._session, id_buffer, id_length)
+ )
+ return bytes_to_native_str(ffi.unpack(id_buffer, id_length))
+
+ def matches(self, message, identity_key=None):
+ # type: (OlmPreKeyMessage, Optional[AnyStr]) -> bool
+ """Checks if the PRE_KEY message is for this in-bound session.
+ This can happen if multiple messages are sent to this session before
+ this session sends a message in reply. Returns True if the session
+ matches. Returns False if the session does not match. Raises
+ OlmSessionError on failure. If the base64 couldn't be decoded then the
+ error message will be "INVALID_BASE64". If the message was for an
+ unsupported protocol version then the error message will be
+ "BAD_MESSAGE_VERSION". If the message couldn't be decoded then then the
+ error message will be * "BAD_MESSAGE_FORMAT".
+
+ Args:
+ message(OlmPreKeyMessage): The Olm prekey message that will checked
+ if it is intended for this session.
+ identity_key(str, optional): The identity key of the sender. To
+ check if the message was also sent using this identity key.
+ """
+ if not isinstance(message, OlmPreKeyMessage):
+ raise TypeError("Matches can only be called with prekey messages.")
+
+ if not message.ciphertext:
+ raise ValueError("Ciphertext can't be empty")
+
+ ret = None
+
+ byte_ciphertext = to_bytes(message.ciphertext)
+
+ message_buffer = ffi.new("char[]", byte_ciphertext)
+
+ if identity_key:
+ byte_id_key = to_bytes(identity_key)
+ identity_key_buffer = ffi.new("char[]", byte_id_key)
+
+ ret = lib.olm_matches_inbound_session_from(
+ self._session,
+ identity_key_buffer, len(byte_id_key),
+ message_buffer, len(byte_ciphertext)
+ )
+
+ else:
+ ret = lib.olm_matches_inbound_session(
+ self._session,
+ message_buffer, len(byte_ciphertext))
+
+ self._check_error(ret)
+
+ return bool(ret)
+
+
+class InboundSession(Session):
+ """Inbound Olm session for p2p encrypted communication.
+ """
+
+ def __new__(cls, account, message, identity_key=None):
+ # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> Session
+ return super().__new__(cls)
+
+ def __init__(self, account, message, identity_key=None):
+ # type: (Account, OlmPreKeyMessage, Optional[AnyStr]) -> None
+ """Create a new inbound Olm session.
+
+ Create a new in-bound session for sending/receiving messages from an
+ incoming prekey message. Raises OlmSessionError on failure. If the
+ base64 couldn't be decoded then error message will be "INVALID_BASE64".
+ If the message was for an unsupported protocol version then
+ the errror message will be "BAD_MESSAGE_VERSION". If the message
+ couldn't be decoded then then the error message will be
+ "BAD_MESSAGE_FORMAT". If the message refers to an unknown one-time
+ key then the error message will be "BAD_MESSAGE_KEY_ID".
+
+ Args:
+ account(Account): The Olm Account that will be used to create this
+ session.
+ message(OlmPreKeyMessage): The Olm prekey message that will checked
+ that will be used to create this session.
+ identity_key(str, optional): The identity key of the sender. To
+ check if the message was also sent using this identity key.
+ """
+ if not message.ciphertext:
+ raise ValueError("Ciphertext can't be empty")
+
+ super().__init__()
+ byte_ciphertext = to_bytes(message.ciphertext)
+ message_buffer = ffi.new("char[]", byte_ciphertext)
+
+ if identity_key:
+ byte_id_key = to_bytes(identity_key)
+ identity_key_buffer = ffi.new("char[]", byte_id_key)
+ self._check_error(lib.olm_create_inbound_session_from(
+ self._session,
+ account._account,
+ identity_key_buffer, len(byte_id_key),
+ message_buffer, len(byte_ciphertext)
+ ))
+ else:
+ self._check_error(lib.olm_create_inbound_session(
+ self._session,
+ account._account,
+ message_buffer, len(byte_ciphertext)
+ ))
+
+
+class OutboundSession(Session):
+ """Outbound Olm session for p2p encrypted communication."""
+
+ def __new__(cls, account, identity_key, one_time_key):
+ # type: (Account, AnyStr, AnyStr) -> Session
+ return super().__new__(cls)
+
+ def __init__(self, account, identity_key, one_time_key):
+ # type: (Account, AnyStr, AnyStr) -> None
+ """Create a new outbound Olm session.
+
+ Creates a new outbound session for sending messages to a given
+ identity key and one-time key.
+
+ Raises OlmSessionError on failure. If the keys couldn't be decoded as
+ base64 then the error message will be "INVALID_BASE64". If there
+ weren't enough random bytes for the session creation the error message
+ for the exception will be NOT_ENOUGH_RANDOM.
+
+ Args:
+ account(Account): The Olm Account that will be used to create this
+ session.
+ identity_key(str): The identity key of the person with whom we want
+ to start the session.
+ one_time_key(str): A one-time key from the person with whom we want
+ to start the session.
+ """
+ if not identity_key:
+ raise ValueError("Identity key can't be empty")
+
+ if not one_time_key:
+ raise ValueError("One-time key can't be empty")
+
+ super().__init__()
+
+ byte_id_key = to_bytes(identity_key)
+ byte_one_time = to_bytes(one_time_key)
+
+ session_random_length = lib.olm_create_outbound_session_random_length(
+ self._session)
+
+ random = URANDOM(session_random_length)
+ random_buffer = ffi.new("char[]", random)
+ identity_key_buffer = ffi.new("char[]", byte_id_key)
+ one_time_key_buffer = ffi.new("char[]", byte_one_time)
+
+ self._check_error(lib.olm_create_outbound_session(
+ self._session,
+ account._account,
+ identity_key_buffer, len(byte_id_key),
+ one_time_key_buffer, len(byte_one_time),
+ random_buffer, session_random_length
+ ))
diff --git a/python/olm/utility.py b/python/olm/utility.py
new file mode 100644
index 0000000..838cf3f
--- /dev/null
+++ b/python/olm/utility.py
@@ -0,0 +1,91 @@
+# -*- coding: utf-8 -*-
+# libolm python bindings
+# Copyright © 2015-2017 OpenMarket Ltd
+# Copyright © 2018 Damir Jelić <poljar@termina.org.uk>
+"""libolm Utility module.
+
+This module contains utilities for olm.
+It only contains the ed25519_verify function for signature verification.
+
+Examples:
+ >>> alice = Account()
+
+ >>> message = "Test"
+ >>> signature = alice.sign(message)
+ >>> signing_key = alice.identity_keys["ed25519"]
+
+ >>> ed25519_verify(signing_key, message, signature)
+
+"""
+
+# pylint: disable=redefined-builtin,unused-import
+from typing import AnyStr, Type
+
+# pylint: disable=no-name-in-module
+from _libolm import ffi, lib # type: ignore
+
+from ._compat import to_bytes
+from ._finalize import track_for_finalization
+
+
+def _clear_utility(utility): # pragma: no cover
+ # type: (ffi.cdata) -> None
+ lib.olm_clear_utility(utility)
+
+
+class OlmVerifyError(Exception):
+ """libolm signature verification exception."""
+
+
+class _Utility(object):
+ # pylint: disable=too-few-public-methods
+ """libolm Utility class."""
+
+ _buf = None
+ _utility = None
+
+ @classmethod
+ def _allocate(cls):
+ # type: (Type[_Utility]) -> None
+ cls._buf = ffi.new("char[]", lib.olm_utility_size())
+ cls._utility = lib.olm_utility(cls._buf)
+ track_for_finalization(cls, cls._utility, _clear_utility)
+
+ @classmethod
+ def _check_error(cls, ret):
+ # type: (int) -> None
+ if ret != lib.olm_error():
+ return
+
+ raise OlmVerifyError("{}".format(
+ ffi.string(lib.olm_utility_last_error(
+ cls._utility)).decode("utf-8")))
+
+ @classmethod
+ def _ed25519_verify(cls, key, message, signature):
+ # type: (Type[_Utility], AnyStr, AnyStr, AnyStr) -> None
+ if not cls._utility:
+ cls._allocate()
+
+ byte_key = to_bytes(key)
+ byte_message = to_bytes(message)
+ byte_signature = to_bytes(signature)
+
+ cls._check_error(
+ lib.olm_ed25519_verify(cls._utility, byte_key, len(byte_key),
+ byte_message, len(byte_message),
+ byte_signature, len(byte_signature)))
+
+
+def ed25519_verify(key, message, signature):
+ # type: (AnyStr, AnyStr, AnyStr) -> None
+ """Verify an ed25519 signature.
+
+ Raises an OlmVerifyError if verification fails.
+
+ Args:
+ key(str): The ed25519 public key used for signing.
+ message(str): The signed message.
+ signature(bytes): The message signature.
+ """
+ return _Utility._ed25519_verify(key, message, signature)