dudeswave2/storage/src/storage.erl

794 lines
18 KiB
Erlang

%
% Copyright (c) 2024 Andrea Biscuola <a@abiscuola.com>
%
% Permission to use, copy, modify, and distribute this software for any
% purpose with or without fee is hereby granted, provided that the above
% copyright notice and this permission notice appear in all copies.
%
% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
%
-module(storage).
-moduledoc """
Object storage module
This module provide functions to interact with the storage application.
Functionalities includes reading and writing records, creating and
deleting buckets and managing their size and replication across a
cluster.
Objects stored in a bucket are defined as:
```
-record(object, {key :: term(),
value :: term(),
tags = [] :: list(term()),
metadata = [] :: list({term(), term()})}).
```
To import the object definition in your module, include the "storage.hrl"
file:
```
-include_lib("storage/include/storage.hrl").
```
""".
-behaviour(gen_server).
% Server start function
-export([start_link/0]).
% Public bucket operation functions
-export([add_node/1, add_replica/2, add_replica/3,
create/1, create/2, create/3,
delete_replica/2, grow/1, grow/2, grow/3,
move/3, remove/1, remove/2, shrink/1,
shrink/2]).
% Public record operation functions
-export([delete/2, list/2, read/2, write/3, write/4]).
% Module callbacks
-export([code_change/3,
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2]).
-include_lib("stdlib/include/qlc.hrl").
-include_lib("storage/include/storage.hrl").
-type object() :: #object{}.
-type metadata() :: [{atom(), term()}].
-export_type([object/0, metadata/0]).
%
% Start functions.
%
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%
% Buckets management functions.
%
-doc """
Add a node to the storage cluster.
Spec:
```
-spec add_node(Node) -> ok | {error, Reason} when
Node :: atom(),
Reason :: term().
```
Example:
```
add_node('b@piwa').
ok
```
Bootstrap and add the node `b@piwa` to the storage cluster.
""".
-spec add_node(Node) -> ok | {error, Reason} when
Node :: atom(),
Reason :: term().
add_node(Node) ->
gen_server:call(?MODULE, {node, Node}).
-doc """
Create a replica of the given `Bucket` on `Node`.
Spec:
```
-spec add_replica(Bucket, Node, Type) -> ok | {error, Reason} when
Bucket :: atom(),
Node :: atom(),
Type :: disc_copies | disc_only_copies | ram_copies,
Reason :: term().
```
`Type` determines the kind of replica the destination node will manage,
be it on disc, RAM, or both. The possible values for `Type` means:
- ram_copies: the replica will be kept only in RAM on the destination node.
- disc_copies: the replica will be in RAM and also on disc.
- disc_only_copies: the replica will reside on disc only.
Example:
Add a disk-only replica of the bucket "objects", on the node "pri@lab":
```
add_replica(objects, 'pri@lab', disc_only_copies).
ok
```
""".
-spec add_replica(Bucket, Node, Type) -> ok | {error, Reason} when
Bucket :: atom(),
Node :: atom(),
Type :: disc_copies | disc_only_copies | ram_copies,
Reason :: term().
add_replica(Bucket, Node, Type) ->
gen_server:call(?MODULE, {replica, Bucket, Node, Type}).
-doc """
Same as:
```
add_replica(Bucket, Node, disc_only_copies).
```
""".
-spec add_replica(Bucket, Node) -> ok | {error, Reason} when
Bucket :: atom(),
Node :: atom(),
Reason :: term().
add_replica(Bucket, Node) ->
gen_server:call(?MODULE, {replica, Bucket, Node, disc_only_copies}).
-doc """
Same as
```
create(Bucket, [node()], disc_only_copies).
```
""".
-spec create(Bucket) -> ok | {error, Reason} when
Bucket :: atom(),
Reason :: term().
create(Bucket) ->
gen_server:call(?MODULE, {create, Bucket, [node()], disc_only_copies}).
-doc """
Same as
```
create(Bucket, Nodes, disc_only_copies).
```
""".
-spec create(Bucket, Nodes) -> ok | {error, Reason} when
Bucket :: atom(),
Nodes :: [atom()],
Reason :: term().
create(Bucket, Nodes) ->
gen_server:call(?MODULE, {create, Bucket, Nodes, disc_only_copies}).
-doc """
Create a new `Bucket` on the specified list of `Nodes`.
Spec:
```
-spec create(Bucket, Nodes, Type) -> ok | {error, Reason} when
Bucket :: atom(),
Nodes :: [atom()],
Type :: ram_copies | disc_copies | disc_only_copies,
Reason :: term().
```
The possible values for `Type` means:
- ram_copies: all the bucket copies will be kept only in RAM
- disc_copies: all the bucket copies will be kept in RAM and also on disc.
- disc_only_copies: all the bucket copies will be kept on disc only.
The bucket will be created with the default size of 2GB, and a replica
will be setup between the given nodes.
Example:
```
create(objects, ['lab1@pri', 'lab2@sec'], disc_only_copies).
ok
```
Creates the bucket "objects", of 2GB, on 'lab1@pri' and setup a replica
on 'lab2@sec'. Both the copies will be on disk only.
If you have different needs, create a single bucket instance and then
add replicas of it calling the `add_replica/3` function.
""".
-spec create(Bucket, Nodes, Type) -> ok | {error, Reason} when
Bucket :: atom(),
Nodes :: [atom()],
Type :: ram_copies | disc_copies | disc_only_copies,
Reason :: term().
create(Bucket, Nodes, Type) ->
gen_server:call(?MODULE, {create, Bucket, Nodes, Type}).
-doc """
Delete a bucket replica.
Spec:
```
-spec delete_replica(Bucket, Node) -> ok | {error, Reason} when
Bucket :: atom(),
Node :: atom(),
Reason :: term().
```
Delete the `Bucket` replica from the given `Node`.
""".
-spec delete_replica(Bucket, Node) -> ok | {error, Reason} when
Bucket :: atom(),
Node :: atom(),
Reason :: term().
delete_replica(Bucket, Node) ->
gen_server:call(?MODULE, {delete_replica, Bucket, Node}).
-doc """
Same as:
```
grow(Bucket, [node()], 1).
```
""".
-spec grow(Bucket) -> ok | {error, Reason} when
Bucket :: atom(),
Reason :: term().
grow(Bucket) ->
gen_server:call(?MODULE, {grow, Bucket, [node()], 1}).
-doc """
Same as:
```
grow(Bucket, Nodes, 1).
```
""".
-spec grow(Bucket, Nodes) -> ok | {error, Reason} when
Bucket :: atom(),
Nodes :: [atom()],
Reason :: term().
grow(Bucket, Nodes) ->
gen_server:call(?MODULE, {grow, Bucket, Nodes, 1}).
-doc """
Add additional partitions to a bucket.
Spec:
```
-spec grow(Bucket, Nodes, Num) -> ok | {error, Reason} when
Bucket :: atom(),
Nodes :: [atom()],
Num :: pos_integer(),
Reason :: term().
```
`Num` is the number of partitions to add to theexisting `Bucket`. Every
added partition, will increase the bucket size by 2GB.
That is, the total size increase of the bucket will be `Num` * 2GB.
The created partitions are going to be replicated on the given list of `Nodes`,
with priority given to nodes based on the order of the list.
Example:
Assuming the "objects" bucket exists:
```
grow(objects, ['lab1@pri', 'lab2@sec'], 4]).
ok
```
Adds 4 new partitions to the "objects" bucket, on 'lab1@pri', replicating
them on 'lab2@sec' with a total size increase of 4 * 2GB = 8GB.
""".
-spec grow(Bucket, Nodes, Num) -> ok | {error, Reason} when
Bucket :: atom(),
Nodes :: [atom()],
Num :: pos_integer(),
Reason :: term().
grow(Bucket, Nodes, Num) ->
gen_server:call(?MODULE, {grow, Bucket, Nodes, Num}).
-doc """
Move a bucket replica copy between nodes.
Spec:
```
-spec move(Bucket, Source, Dest) -> ok | {error, Reason} when
Bucket :: atom(),
Source :: atom(),
Dest :: atom(),
Reason :: term().
```
The copy of `Bucket` living on `Source`, will be moved to `Dest` and deleted
from the source node once the operation is completed.
""".
-spec move(Bucket, Source, Dest) -> ok | {error, Reason} when
Bucket :: atom(),
Source :: atom(),
Dest :: atom(),
Reason :: term().
move(Bucket, Source, Dest) ->
gen_server:call(?MODULE, {move, Bucket, Source, Dest}).
-doc """
Delete the given `Bucket` and all it's replicas.
Spec:
```
-spec remove(Bucket) -> ok | {error, Reason} when
Bucket :: atom(),
Reason :: term().
```
""".
-spec remove(Bucket) -> ok | {error, Reason} when
Bucket :: atom(),
Reason :: term().
remove(Bucket) ->
gen_server:call(?MODULE, {remove, Bucket}).
-doc """
Delete the replica of the given `Bucket` from Node.
Spec:
```
-spec remove(Bucket, Node) -> ok | {error, Reason} when
Bucket :: atom(),
Node :: atom(),
Reason :: term().
```
""".
-spec remove(Bucket, Node) -> ok | {error, Reason} when
Bucket :: atom(),
Node :: atom(),
Reason :: term().
remove(Bucket, Node) ->
gen_server:call(?MODULE, {remove, Bucket, Node}).
-doc """
Same as:
```
shrink(Bucket, 1).
""".
-spec shrink(Bucket) -> ok | {error, Reason} when
Bucket :: atom(),
Reason :: term().
shrink(Bucket) ->
gen_server:call(?MODULE, {shrink, Bucket, 1}).
-doc """
Delete bucket partitions.
Spec:
```
-spec shrink(Bucket, Num) -> ok | {error, Reason} when
Bucket :: atom(),
Num :: pos_integer(),
Reason :: term().
```
Remove `Num` partitions from the given `Bucket`. Objects stored
on the deleted partitions are moved to the remaining ones automatically.
Removing partitions will shrink the total bucket size of 2GB * `Num`.
Example:
```
shrink(objects, 6).
ok
```
Removes 6 partitions from the "objects" bucket, reducing it's size by
2GB * 6 = 12GB.
""".
-spec shrink(Bucket, Num) -> ok | {error, Reason} when
Bucket :: atom(),
Num :: pos_integer(),
Reason :: term().
shrink(Bucket, Num) ->
gen_server:call(?MODULE, {shrink, Bucket, Num}).
%
% Record operation functions.
%
-doc """
Delete an object from a bucket.
Spec:
```
-spec delete(Bucket, Key) -> ok | {error, Reason} when
Bucket :: atom(),
Key :: term(),
Reason :: term().
```
""".
-spec delete(Bucket, Key) -> ok | {error, Reason} when
Bucket :: atom(),
Key :: term(),
Reason :: term().
delete(Bucket, Key) ->
gen_server:call(?MODULE, {delete, Bucket, Key}).
-doc """
Spec:
```
-spec list(Bucket, Metadata) -> {ok, Records} when
Bucket :: atom(),
Metadata :: metadata(),
Records :: [object()].
```
Read a list of objects from a bucket.
Returned objects are the ones tagged with `all` the tags in the tags
list of it's metadata field.
Example:
```
{ok, Objs} = list(objects, [{tags, [foo, bar]}]).
Objs.
[#object{key = o1,
tags = [foo, bar],
value = {ipse, dixit},
metadata = [{tags, [foo, bar]}]
}, #object{key = o2,
tags = [foo, bar],
value = 42,
metadata = [{tags, [foo, bar]}]
}
]
""".
-spec list(Bucket, Metadata) -> {ok, Records} when
Bucket :: atom(),
Metadata :: metadata(),
Records :: [object()].
list(Bucket, Metadata) ->
gen_server:call(?MODULE, {list, Bucket, Metadata}).
-doc """
Read an object from a bucket.
Spec:
```
-spec read(Bucket, Key) -> {ok, Record} | {error, Reason} when
Bucket :: atom(),
Key :: term(),
Record :: [object()],
Reason :: term().
```
Example:
```
{ok, [R]} = read(objects, foo).
R.
#object{key = o1,value = {ipse, dixit},
metadata = [{tags, [foo, bar]}]
```
""".
-spec read(Bucket, Key) -> {ok, Record} | {error, Reason} when
Bucket :: atom(),
Key :: term(),
Record :: [object()],
Reason :: term().
read(Bucket, Key) ->
gen_server:call(?MODULE, {read, Bucket, Key}).
-doc """
Same as:
```
write(Bucket, Key, Term, []).
```
""".
-spec write(Bucket, Key, Term) -> ok | {error, Reason} when
Bucket :: atom(),
Key :: term(),
Term :: term(),
Reason :: term().
write(Bucket, Key, Term) ->
gen_server:call(?MODULE, {write, Bucket, Key, Term}).
-doc """
Store an object in a provided bucket.
Spec:
```
-spec write(Bucket, Key, Term, Metadata) -> ok | {error, Reason} when
Bucket :: atom(),
Key :: term(),
Term :: term(),
Metadata :: metadata(),
Reason :: term().
```
Store an object identified by `Key` with value `Term` in `Bucket`. The
key must be unique and writing an object with the same key as an
existing one will cause an update of the existing object.
`Metadata` represent additional data attached to the stored object,
useful when later searching for objects using `list/2`.
Example:
```
write(objects, hello, world, [{tags, [blog, post]}]).
ok
```
Store an object identified by the key "hello" in the "objects" bucket,
tagged with the atoms `blog` and `post` in it's metadata.
""".
-spec write(Bucket, Key, Term, Metadata) -> ok | {error, Reason} when
Bucket :: atom(),
Key :: term(),
Term :: term(),
Metadata :: metadata(),
Reason :: term().
write(Bucket, Key, Term, Metadata) ->
gen_server:call(?MODULE, {write, Bucket, Key, Term, Metadata}).
%
% Module callbacks.
%
code_change(_OldVsn, N, _Extra) -> {ok, N}.
init([]) ->
process_flag(trap_exit, true),
{ok, 0}.
terminate(_Reason, _N) -> ok.
%
% Bucket operations callbacks.
%
handle_call({node, Node}, _From, State) ->
ok = rpc:call(Node, mnesia, start, []),
{ok, _} = mnesia:change_config(extra_db_nodes, [Node]),
case mnesia:change_table_copy_type(schema, Node, disc_copies) of
{atomic, ok} -> {reply, ok, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({create, Bucket, Nodes, Type}, _From, State) ->
mnesia:create_table(Bucket, [{attributes, record_info(fields, object)},
{Type, Nodes}, {record_name, object}]),
case mnesia:change_table_frag(Bucket, {activate, []}) of
{atomic, ok} -> {reply, ok, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({delete_replica, Bucket, Node}, _From, State) ->
case mnesia:del_table_copy(Bucket, Node) of
{atomic, _} -> {reply, ok, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({grow, Bucket, Nodes, Num}, _From, State) ->
case grow_bucket(Bucket, Nodes, Num) of
ok -> {reply, ok, State};
{error, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({move, Bucket, Source, Dest}, _From, State) ->
case mnesia:move_table_copy(Bucket, Source, Dest) of
{atomic, ok} -> {reply, ok, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({remove, Bucket}, _From, State) ->
case mnesia:delete_table(Bucket) of
{atomic, ok} -> {reply, ok, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({remove, Bucket, Node}, _From, State) ->
case mnesia:del_table_copy(Bucket, Node) of
{atomic, ok} -> {reply, ok, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({replica, Bucket, Node, Type}, _From, State) ->
case mnesia:add_table_copy(Bucket, Node, Type) of
{atomic, _} -> {reply, ok, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({shrink, Bucket, Num}, _From, State) ->
case shrink_bucket(Bucket, Num) of
ok -> {reply, ok, State};
{error, Reason} -> {reply, {error, Reason}, State}
end;
%
% Record operation callbacks.
%
%
% Read callbacks.
%
handle_call({list, Bucket, Metadata}, _From, State) ->
case search(Bucket, Metadata) of
{ok, Records} -> {reply, {ok, Records}, State};
{error, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({read, Bucket, Key}, _From, State) ->
F = fun() ->
mnesia:read(Bucket, Key)
end,
case mnesia:transaction(F) of
{atomic, R} -> {reply, {ok, R}, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end;
%
% Write callbacks.
%
handle_call({delete, Bucket, Key}, _From, State) ->
F = fun() ->
mnesia:delete({Bucket, Key})
end,
case mnesia:transaction(F) of
{atomic, ok} -> {reply, ok, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end;
handle_call({write, Bucket, Key, Term}, _From, State) ->
handle_call({write, Bucket, Key, Term, []}, _From, State);
handle_call({write, Bucket, Key, Term, Metadata}, _From, State) ->
F = fun() ->
R = #object{key = Key, value = Term, metadata = Metadata},
mnesia:write(Bucket, R, write)
end,
case mnesia:transaction(F) of
{atomic, _} -> {reply, ok, State};
{aborted, Reason} -> {reply, {error, Reason}, State}
end.
handle_cast(_Msg, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.
%
% Private functions.
%
-spec grow_bucket(Bucket, Nodes, Num) -> ok | {error, Reason} when
Bucket :: atom(),
Nodes :: [atom()],
Num :: pos_integer(),
Reason :: term().
grow_bucket(_Bucket, _Nodes, 0) -> ok;
grow_bucket(Bucket, Nodes, Num) when is_integer(Num) ->
case mnesia:change_table_frag(Bucket, {add_frag, Nodes}) of
{atomic, ok} ->
grow_bucket(Bucket, Nodes, Num - 1);
{aborted, Reason} ->
{error, Reason}
end.
-spec shrink_bucket(Bucket, Num) -> ok | {error, Reason} when
Bucket :: atom(),
Num :: pos_integer(),
Reason :: term().
shrink_bucket(_Bucket, 0) -> ok;
shrink_bucket(Bucket, Num) when is_integer(Num) ->
case mnesia:change_table_frag(Bucket, del_frag) of
{atomic, ok} ->
shrink_bucket(Bucket, Num - 1);
{aborted, Reason} ->
{error, Reason}
end.
-spec search(Bucket, Metadata) -> {ok, Objects} | {error, Reason} when
Bucket :: atom(),
Metadata :: metadata(),
Objects :: [object()],
Reason :: term().
search(Bucket, Metadata) ->
Q = qlc:q([X || X <- mnesia:table(Bucket), filter(Metadata, X#object.metadata)]),
F = fun() ->
qlc:e(Q)
end,
case mnesia:transaction(F) of
{atomic, L} -> {ok, L};
{aborted, Reason} -> {error, Reason}
end.
%
% A record is reported if and only if it contains all the key-value
% pairs passed in the M list.
%
% As a nice side effect, passing an empty metadata list `[]`, return
% the record itself. Useful to list all ther records in a specific table.
%
filter([], _M) -> true;
filter([H | T], M) ->
{K, V} = H,
case lists:keyfind(K, 1, M) of
false -> false;
{_Key, Val} ->
if
Val =:= V -> filter(T, M);
true -> false
end
end.