% % Copyright (c) 2024 Andrea Biscuola % % Permission to use, copy, modify, and distribute this software for any % purpose with or without fee is hereby granted, provided that the above % copyright notice and this permission notice appear in all copies. % % THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES % WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF % MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR % ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES % WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN % ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF % OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. % -module(storage). -moduledoc """ Object storage module This module provide functions to interact with the storage application. Functionalities includes reading and writing records, creating and deleting buckets and managing their size and replication across a cluster. Objects stored in a bucket are defined as: ``` -record(object, {key :: term(), value :: term(), tags = [] :: list(term()), metadata = [] :: list({term(), term()})}). ``` To import the object definition in your module, include the "storage.hrl" file: ``` -include_lib("storage/include/storage.hrl"). ``` """. -behaviour(gen_server). % Server start function -export([start_link/0]). % Public bucket operation functions -export([add_node/1, add_replica/2, add_replica/3, create/1, create/2, create/3, delete_replica/2, grow/1, grow/2, grow/3, move/3, remove/1, remove/2, shrink/1, shrink/2]). % Public record operation functions -export([delete/2, list/2, read/2, write/3, write/4]). % Module callbacks -export([code_change/3, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). -include_lib("stdlib/include/qlc.hrl"). -include_lib("storage/include/storage.hrl"). -define(MIMEFILE, "/mimes.txt"). -define(DEFMIME, "application/octect-stream"). -type object() :: #object{}. -type metadata() :: [{atom(), term()}]. -export_type([object/0, metadata/0]). -record(mime, {ext, mtype}). % % Start functions. % start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). % % Buckets management functions. % -doc """ Add a node to the storage cluster. Spec: ``` -spec add_node(Node) -> ok | {error, Reason} when Node :: atom(), Reason :: term(). ``` Example: ``` add_node('b@piwa'). ok ``` Bootstrap and add the node `b@piwa` to the storage cluster. """. -spec add_node(Node) -> ok | {error, Reason} when Node :: atom(), Reason :: term(). add_node(Node) -> gen_server:call(?MODULE, {node, Node}). -doc """ Create a replica of the given `Bucket` on `Node`. Spec: ``` -spec add_replica(Bucket, Node, Type) -> ok | {error, Reason} when Bucket :: atom(), Node :: atom(), Type :: disc_copies | disc_only_copies | ram_copies, Reason :: term(). ``` `Type` determines the kind of replica the destination node will manage, be it on disc, RAM, or both. The possible values for `Type` means: - ram_copies: the replica will be kept only in RAM on the destination node. - disc_copies: the replica will be in RAM and also on disc. - disc_only_copies: the replica will reside on disc only. Example: Add a disk-only replica of the bucket "objects", on the node "pri@lab": ``` add_replica(objects, 'pri@lab', disc_only_copies). ok ``` """. -spec add_replica(Bucket, Node, Type) -> ok | {error, Reason} when Bucket :: atom(), Node :: atom(), Type :: disc_copies | disc_only_copies | ram_copies, Reason :: term(). add_replica(Bucket, Node, Type) -> gen_server:call(?MODULE, {replica, Bucket, Node, Type}). -doc """ Same as: ``` add_replica(Bucket, Node, disc_only_copies). ``` """. -spec add_replica(Bucket, Node) -> ok | {error, Reason} when Bucket :: atom(), Node :: atom(), Reason :: term(). add_replica(Bucket, Node) -> gen_server:call(?MODULE, {replica, Bucket, Node, disc_only_copies}). -doc """ Same as ``` create(Bucket, [node()], disc_only_copies). ``` """. -spec create(Bucket) -> ok | {error, Reason} when Bucket :: atom(), Reason :: term(). create(Bucket) -> gen_server:call(?MODULE, {create, Bucket, [node()], disc_only_copies}). -doc """ Same as ``` create(Bucket, Nodes, disc_only_copies). ``` """. -spec create(Bucket, Nodes) -> ok | {error, Reason} when Bucket :: atom(), Nodes :: [atom()], Reason :: term(). create(Bucket, Nodes) -> gen_server:call(?MODULE, {create, Bucket, Nodes, disc_only_copies}). -doc """ Create a new `Bucket` on the specified list of `Nodes`. Spec: ``` -spec create(Bucket, Nodes, Type) -> ok | {error, Reason} when Bucket :: atom(), Nodes :: [atom()], Type :: ram_copies | disc_copies | disc_only_copies, Reason :: term(). ``` The possible values for `Type` means: - ram_copies: all the bucket copies will be kept only in RAM - disc_copies: all the bucket copies will be kept in RAM and also on disc. - disc_only_copies: all the bucket copies will be kept on disc only. The bucket will be created with the default size of 2GB, and a replica will be setup between the given nodes. Example: ``` create(objects, ['lab1@pri', 'lab2@sec'], disc_only_copies). ok ``` Creates the bucket "objects", of 2GB, on 'lab1@pri' and setup a replica on 'lab2@sec'. Both the copies will be on disk only. If you have different needs, create a single bucket instance and then add replicas of it calling the `add_replica/3` function. """. -spec create(Bucket, Nodes, Type) -> ok | {error, Reason} when Bucket :: atom(), Nodes :: [atom()], Type :: ram_copies | disc_copies | disc_only_copies, Reason :: term(). create(Bucket, Nodes, Type) -> gen_server:call(?MODULE, {create, Bucket, Nodes, Type}). -doc """ Delete a bucket replica. Spec: ``` -spec delete_replica(Bucket, Node) -> ok | {error, Reason} when Bucket :: atom(), Node :: atom(), Reason :: term(). ``` Delete the `Bucket` replica from the given `Node`. """. -spec delete_replica(Bucket, Node) -> ok | {error, Reason} when Bucket :: atom(), Node :: atom(), Reason :: term(). delete_replica(Bucket, Node) -> gen_server:call(?MODULE, {delete_replica, Bucket, Node}). -doc """ Same as: ``` grow(Bucket, [node()], 1). ``` """. -spec grow(Bucket) -> ok | {error, Reason} when Bucket :: atom(), Reason :: term(). grow(Bucket) -> gen_server:call(?MODULE, {grow, Bucket, [node()], 1}). -doc """ Same as: ``` grow(Bucket, Nodes, 1). ``` """. -spec grow(Bucket, Nodes) -> ok | {error, Reason} when Bucket :: atom(), Nodes :: [atom()], Reason :: term(). grow(Bucket, Nodes) -> gen_server:call(?MODULE, {grow, Bucket, Nodes, 1}). -doc """ Add additional partitions to a bucket. Spec: ``` -spec grow(Bucket, Nodes, Num) -> ok | {error, Reason} when Bucket :: atom(), Nodes :: [atom()], Num :: pos_integer(), Reason :: term(). ``` `Num` is the number of partitions to add to theexisting `Bucket`. Every added partition, will increase the bucket size by 2GB. That is, the total size increase of the bucket will be `Num` * 2GB. The created partitions are going to be replicated on the given list of `Nodes`, with priority given to nodes based on the order of the list. Example: Assuming the "objects" bucket exists: ``` grow(objects, ['lab1@pri', 'lab2@sec'], 4]). ok ``` Adds 4 new partitions to the "objects" bucket, on 'lab1@pri', replicating them on 'lab2@sec' with a total size increase of 4 * 2GB = 8GB. """. -spec grow(Bucket, Nodes, Num) -> ok | {error, Reason} when Bucket :: atom(), Nodes :: [atom()], Num :: pos_integer(), Reason :: term(). grow(Bucket, Nodes, Num) -> gen_server:call(?MODULE, {grow, Bucket, Nodes, Num}). -doc """ Move a bucket replica copy between nodes. Spec: ``` -spec move(Bucket, Source, Dest) -> ok | {error, Reason} when Bucket :: atom(), Source :: atom(), Dest :: atom(), Reason :: term(). ``` The copy of `Bucket` living on `Source`, will be moved to `Dest` and deleted from the source node once the operation is completed. """. -spec move(Bucket, Source, Dest) -> ok | {error, Reason} when Bucket :: atom(), Source :: atom(), Dest :: atom(), Reason :: term(). move(Bucket, Source, Dest) -> gen_server:call(?MODULE, {move, Bucket, Source, Dest}). -doc """ Delete the given `Bucket` and all it's replicas. Spec: ``` -spec remove(Bucket) -> ok | {error, Reason} when Bucket :: atom(), Reason :: term(). ``` """. -spec remove(Bucket) -> ok | {error, Reason} when Bucket :: atom(), Reason :: term(). remove(Bucket) -> gen_server:call(?MODULE, {remove, Bucket}). -doc """ Delete the replica of the given `Bucket` from Node. Spec: ``` -spec remove(Bucket, Node) -> ok | {error, Reason} when Bucket :: atom(), Node :: atom(), Reason :: term(). ``` """. -spec remove(Bucket, Node) -> ok | {error, Reason} when Bucket :: atom(), Node :: atom(), Reason :: term(). remove(Bucket, Node) -> gen_server:call(?MODULE, {remove, Bucket, Node}). -doc """ Same as: ``` shrink(Bucket, 1). """. -spec shrink(Bucket) -> ok | {error, Reason} when Bucket :: atom(), Reason :: term(). shrink(Bucket) -> gen_server:call(?MODULE, {shrink, Bucket, 1}). -doc """ Delete bucket partitions. Spec: ``` -spec shrink(Bucket, Num) -> ok | {error, Reason} when Bucket :: atom(), Num :: pos_integer(), Reason :: term(). ``` Remove `Num` partitions from the given `Bucket`. Objects stored on the deleted partitions are moved to the remaining ones automatically. Removing partitions will shrink the total bucket size of 2GB * `Num`. Example: ``` shrink(objects, 6). ok ``` Removes 6 partitions from the "objects" bucket, reducing it's size by 2GB * 6 = 12GB. """. -spec shrink(Bucket, Num) -> ok | {error, Reason} when Bucket :: atom(), Num :: pos_integer(), Reason :: term(). shrink(Bucket, Num) -> gen_server:call(?MODULE, {shrink, Bucket, Num}). % % Record operation functions. % -doc """ Delete an object from a bucket. Spec: ``` -spec delete(Bucket, Key) -> ok | {error, Reason} when Bucket :: atom(), Key :: term(), Reason :: term(). ``` """. -spec delete(Bucket, Key) -> ok | {error, Reason} when Bucket :: atom(), Key :: term(), Reason :: term(). delete(Bucket, Key) -> gen_server:call(?MODULE, {delete, Bucket, Key}). -doc """ Spec: ``` -spec list(Bucket, Metadata) -> {ok, Records} when Bucket :: atom(), Metadata :: metadata(), Records :: [object()]. ``` Read a list of objects from a bucket. Returned objects are the ones tagged with `all` the tags in the tags list of it's metadata field. Example: ``` {ok, Objs} = list(objects, [{tags, [foo, bar]}]). Objs. [#object{key = o1, tags = [foo, bar], value = {ipse, dixit}, metadata = [{tags, [foo, bar]},{mime, 'application/octect-stream'}] }, #object{key = o2, tags = [foo, bar], value = 42, metadata = [{tags, [foo, bar]},{mime, 'application/octect-stream'}] } ] """. -spec list(Bucket, Metadata) -> {ok, Records} when Bucket :: atom(), Metadata :: metadata(), Records :: [object()]. list(Bucket, Metadata) -> gen_server:call(?MODULE, {list, Bucket, Metadata}). -doc """ Read an object from a bucket. Spec: ``` -spec read(Bucket, Key) -> {ok, Record} | {error, Reason} when Bucket :: atom(), Key :: term(), Record :: [object()], Reason :: term(). ``` Example: ``` {ok, [R]} = read(objects, foo). R. #object{key = o1,value = {ipse, dixit}, metadata = [{tags, [foo, bar]},{mime, 'application/octet-stream'}]} ``` """. -spec read(Bucket, Key) -> {ok, Record} | {error, Reason} when Bucket :: atom(), Key :: term(), Record :: [object()], Reason :: term(). read(Bucket, Key) -> gen_server:call(?MODULE, {read, Bucket, Key}). -doc """ Same as: ``` write(Bucket, Key, Term, []). ``` """. -spec write(Bucket, Key, Term) -> ok | {error, Reason} when Bucket :: atom(), Key :: term(), Term :: term(), Reason :: term(). write(Bucket, Key, Term) -> gen_server:call(?MODULE, {write, Bucket, Key, Term}). -doc """ Store an object in a provided bucket. Spec: ``` -spec write(Bucket, Key, Term, Metadata) -> ok | {error, Reason} when Bucket :: atom(), Key :: term(), Term :: term(), Metadata :: metadata(), Reason :: term(). ``` Store an object identified by `Key` with value `Term` in `Bucket`. The key must be unique and writing an object with the same key as an existing one will cause an update of the existing object. `Metadata` represent additional data attached to the stored object, useful when later searching for objects using `list/2`. Example: ``` write(objects, hello, world, [{tags, [blog, post]}]). ok ``` Store an object identified by the key "hello" in the "objects" bucket, tagged with the atoms `blog` and `post` in it's metadata. """. -spec write(Bucket, Key, Term, Metadata) -> ok | {error, Reason} when Bucket :: atom(), Key :: term(), Term :: term(), Metadata :: metadata(), Reason :: term(). write(Bucket, Key, Term, Metadata) -> gen_server:call(?MODULE, {write, Bucket, Key, Term, Metadata}). % % Module callbacks. % code_change(_OldVsn, N, _Extra) -> {ok, N}. init([]) -> process_flag(trap_exit, true), mnesia:create_table(mime, [{attributes, record_info(fields, mime)}, {ram_copies, [node()]}, {local_content, true}]), F= fun() -> {ok, App} = application:get_application(?MODULE), {ok, M} = file:consult(lists:append(code:priv_dir(App), ?MIMEFILE)), [mnesia:write(#mime{ext = E, mtype = T}) || {E, T} <- M] end, mnesia:transaction(F), {ok, 0}. terminate(_Reason, _N) -> mnesia:delete_table(mime), ok. % % Bucket operations callbacks. % handle_call({node, Node}, _From, State) -> ok = rpc:call(Node, mnesia, start, []), {ok, _} = mnesia:change_config(extra_db_nodes, [Node]), case mnesia:change_table_copy_type(schema, Node, disc_copies) of {atomic, ok} -> {reply, ok, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end; handle_call({create, Bucket, Nodes, Type}, _From, State) -> mnesia:create_table(Bucket, [{attributes, record_info(fields, object)}, {Type, Nodes}, {record_name, object}]), case mnesia:change_table_frag(Bucket, {activate, []}) of {atomic, ok} -> {reply, ok, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end; handle_call({delete_replica, Bucket, Node}, _From, State) -> case mnesia:del_table_copy(Bucket, Node) of {atomic, _} -> {reply, ok, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end; handle_call({grow, Bucket, Nodes, Num}, _From, State) -> case grow_bucket(Bucket, Nodes, Num) of ok -> {reply, ok, State}; {error, Reason} -> {reply, {error, Reason}, State} end; handle_call({move, Bucket, Source, Dest}, _From, State) -> case mnesia:move_table_copy(Bucket, Source, Dest) of {atomic, ok} -> {reply, ok, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end; handle_call({remove, Bucket}, _From, State) -> case mnesia:delete_table(Bucket) of {atomic, ok} -> {reply, ok, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end; handle_call({remove, Bucket, Node}, _From, State) -> case mnesia:del_table_copy(Bucket, Node) of {atomic, ok} -> {reply, ok, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end; handle_call({replica, Bucket, Node, Type}, _From, State) -> case mnesia:add_table_copy(Bucket, Node, Type) of {atomic, _} -> {reply, ok, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end; handle_call({shrink, Bucket, Num}, _From, State) -> case shrink_bucket(Bucket, Num) of ok -> {reply, ok, State}; {error, Reason} -> {reply, {error, Reason}, State} end; % % Record operation callbacks. % % % Read callbacks. % handle_call({list, Bucket, Metadata}, _From, State) -> case search(Bucket, Metadata) of {ok, Records} -> {reply, {ok, Records}, State}; {error, Reason} -> {reply, {error, Reason}, State} end; handle_call({read, Bucket, Key}, _From, State) -> F = fun() -> mnesia:read(Bucket, Key) end, case mnesia:transaction(F) of {atomic, R} -> {reply, {ok, R}, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end; % % Write callbacks. % handle_call({delete, Bucket, Key}, _From, State) -> F = fun() -> mnesia:delete({Bucket, Key}) end, case mnesia:transaction(F) of {atomic, ok} -> {reply, ok, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end; handle_call({write, Bucket, Key, Term}, _From, State) -> handle_call({write, Bucket, Key, Term, []}, _From, State); handle_call({write, Bucket, Key, Term, Metadata}, _From, State) -> Ex = filename:extension(Key), F = fun() -> M = case mnesia:read(mime, Ex) of [] -> ?DEFMIME; [Mt] -> Mt#mime.mtype end, R = #object{key = Key, value = Term, metadata = lists:flatten([{mime, M}, Metadata])}, mnesia:write(Bucket, R, write) end, case mnesia:transaction(F) of {atomic, _} -> {reply, ok, State}; {aborted, Reason} -> {reply, {error, Reason}, State} end. handle_cast(_Msg, State) -> {noreply, State}. handle_info(_Info, State) -> {noreply, State}. % % Private functions. % -spec grow_bucket(Bucket, Nodes, Num) -> ok | {error, Reason} when Bucket :: atom(), Nodes :: [atom()], Num :: pos_integer(), Reason :: term(). grow_bucket(_Bucket, _Nodes, 0) -> ok; grow_bucket(Bucket, Nodes, Num) when is_integer(Num) -> case mnesia:change_table_frag(Bucket, {add_frag, Nodes}) of {atomic, ok} -> grow_bucket(Bucket, Nodes, Num - 1); {aborted, Reason} -> {error, Reason} end. -spec shrink_bucket(Bucket, Num) -> ok | {error, Reason} when Bucket :: atom(), Num :: pos_integer(), Reason :: term(). shrink_bucket(_Bucket, 0) -> ok; shrink_bucket(Bucket, Num) when is_integer(Num) -> case mnesia:change_table_frag(Bucket, del_frag) of {atomic, ok} -> shrink_bucket(Bucket, Num - 1); {aborted, Reason} -> {error, Reason} end. -spec search(Bucket, Metadata) -> {ok, Objects} | {error, Reason} when Bucket :: atom(), Metadata :: metadata(), Objects :: [object()], Reason :: term(). search(Bucket, Metadata) -> Q = qlc:q([X || X <- mnesia:table(Bucket), filter(Metadata, X#object.metadata)]), F = fun() -> qlc:e(Q) end, case mnesia:transaction(F) of {atomic, L} -> {ok, L}; {aborted, Reason} -> {error, Reason} end. % % A record is reported if and only if it contains all the key-value % pairs passed in the M list. % % As a nice side effect, passing an empty metadata list `[]`, return % the record itself. Useful to list all ther records in a specific table. % filter([], _M) -> true; filter([H | T], M) -> {K, V} = H, case lists:keyfind(K, 1, M) of false -> false; {_Key, Val} -> if Val =:= V -> filter(T, M); true -> false end end.