From 14e6c3cc94852d07220b02eb3c129f8a7e542ac5 Mon Sep 17 00:00:00 2001 From: absc Date: Tue, 23 Jul 2024 22:09:08 +0200 Subject: [PATCH] Import the storage application. This is a small abstraction layer on top of mnesia. It simplifies table management and addition of replicas and nodes. Some things to add in the near future: * Removing nodes from a mnesia cluster is not supported. * Observability and event handling is missing. While it's possible to perform the aformentioned functions with the mnesia own APIs, adding some simplifications for day to day management may be a good idea. However, the library is already used in another project and it's good enough for an initial use. It will be extended if and when required. --- storage/Makefile | 7 + storage/ebin/storage.app | 20 + storage/include/storage.hrl | 19 + storage/priv/mimes.txt | 124 +++++ storage/src/Makefile | 19 + storage/src/storage.erl | 820 +++++++++++++++++++++++++++++ storage/src/storage_app.erl | 25 + storage/src/storage_supervisor.erl | 31 ++ 8 files changed, 1065 insertions(+) create mode 100644 storage/Makefile create mode 100644 storage/ebin/storage.app create mode 100644 storage/include/storage.hrl create mode 100644 storage/priv/mimes.txt create mode 100644 storage/src/Makefile create mode 100644 storage/src/storage.erl create mode 100644 storage/src/storage_app.erl create mode 100644 storage/src/storage_supervisor.erl diff --git a/storage/Makefile b/storage/Makefile new file mode 100644 index 0000000..160022d --- /dev/null +++ b/storage/Makefile @@ -0,0 +1,7 @@ +.PHONY: all clean + +all: + ${MAKE} -C src + +clean: + ${MAKE} -C src clean diff --git a/storage/ebin/storage.app b/storage/ebin/storage.app new file mode 100644 index 0000000..2400a4b --- /dev/null +++ b/storage/ebin/storage.app @@ -0,0 +1,20 @@ +{application, storage, + [{description, "Cluster storage server"}, + {vsn, "1.0.0"}, + {modules, [ + storage_app, + storage, + storage_supervisor + ]}, + {registered, [ + storage, + storage_supervisor + ]}, + {applications, [ + kernel, + stdlib, + mnesia + ]}, + {mod, {storage_app, []}}, + {start_phases, []} +]}. diff --git a/storage/include/storage.hrl b/storage/include/storage.hrl new file mode 100644 index 0000000..51ea0b1 --- /dev/null +++ b/storage/include/storage.hrl @@ -0,0 +1,19 @@ +% +% Copyright (c) 2024 Andrea Biscuola +% +% Permission to use, copy, modify, and distribute this software for any +% purpose with or without fee is hereby granted, provided that the above +% copyright notice and this permission notice appear in all copies. +% +% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +% + +-record(object, {key :: term(), + value :: term(), + metadata = [] :: list({atom(), term()})}). \ No newline at end of file diff --git a/storage/priv/mimes.txt b/storage/priv/mimes.txt new file mode 100644 index 0000000..3702ab1 --- /dev/null +++ b/storage/priv/mimes.txt @@ -0,0 +1,124 @@ +{".gmi", "text/gemini"}. +{".gmni", "text/gemini"}. +{".atom", "application/atom+xml"}. +{".woff", "application/font-woff"}. +{".jar", "application/java-archive"}. +{".war", "application/java-archive"}. +{".ear", "application/java-archive"}. +{".js", "application/javascript"}. +{".json", "application/json"}. +{".hqx", "application/mac-binhex40"}. +{".doc", "application/msword"}. +{".bin", "application/octet-stream"}. +{".exe", "application/octet-stream"}. +{".dll", "application/octet-stream"}. +{".deb", "application/octet-stream"}. +{".dmg", "application/octet-stream"}. +{".fs", "application/octet-stream"}. +{".iso", "application/octet-stream"}. +{".img", "application/octet-stream"}. +{".msi", "application/octet-stream"}. +{".msp", "application/octet-stream"}. +{".msm", "application/octet-stream"}. +{".pdf", "application/pdf"}. +{".ps", "application/postscript"}. +{".eps", "application/postscript"}. +{".ai", "application/postscript"}. +{".rss", "application/rss+xml"}. +{".rtf", "application/rtf"}. +{".m3u8", "application/vnd.apple.mpegurl"}. +{".kml", "application/vnd.google-earth.kml+xml"}. +{".kmz", "application/vnd.google-earth.kmz"}. +{".xls", "application/vnd.ms-excel"}. +{".eot", "application/vnd.ms-fontobject"}. +{".ppt", "application/vnd.ms-powerpoint"}. +{".odc", "application/vnd.oasis.opendocument.chart"}. +{".otc", "application/vnd.oasis.opendocument.chart-template"}. +{".odb", "application/vnd.oasis.opendocument.database"}. +{".odf", "application/vnd.oasis.opendocument.formula"}. +{".otf", "application/vnd.oasis.opendocument.formula-template"}. +{".odg", "application/vnd.oasis.opendocument.graphics"}. +{".otg", "application/vnd.oasis.opendocument.graphics-template"}. +{".odi", "application/vnd.oasis.opendocument.image"}. +{".oti", "application/vnd.oasis.opendocument.image-template"}. +{".odp", "application/vnd.oasis.opendocument.presentation"}. +{".otp", "application/vnd.oasis.opendocument.presentation-template"}. +{".ods", "application/vnd.oasis.opendocument.spreadsheet"}. +{".ots", "application/vnd.oasis.opendocument.spreadsheet-template"}. +{".odt", "application/vnd.oasis.opendocument.text"}. +{".odm", "application/vnd.oasis.opendocument.text-master"}. +{".ott", "application/vnd.oasis.opendocument.text-template"}. +{".oth", "application/vnd.oasis.opendocument.text-web"}. +{".wmlc", "application/vnd.wap.wmlc"}. +{".7z", "application/x-7z-compressed"}. +{".cco", "application/x-cocoa"}. +{".jardiff", "application/x-java-archive-diff"}. +{".jnlp", "application/x-java-jnlp-file"}. +{".run", "application/x-makeself"}. +{".pac", "application/x-ns-proxy-autoconfig"}. +{".pl", "application/x-perl"}. +{".pm", "application/x-perl"}. +{".prc", "application/x-pilot"}. +{".pdb", "application/x-pilot"}. +{".rar", "application/x-rar-compressed"}. +{".rpm", "application/x-redhat-package-manager"}. +{".sea", "application/x-sea"}. +{".swf", "application/x-shockwave-flash"}. +{".sit", "application/x-stuffit"}. +{".tcl", "application/x-tcl"}. +{".tk", "application/x-tcl"}. +{".der", "application/x-x509-ca-cert"}. +{".pem", "application/x-x509-ca-cert"}. +{".crt", "application/x-x509-ca-cert"}. +{".xpi", "application/x-xpinstall"}. +{".xhtml", "application/xhtml+xml"}. +{".zip", "application/zip"}. +{".ez", "application/zip"}. +{".au", "audio/basic"}. +{".snd", "audio/basic"}. +{".mid", "audio/midi"}. +{".midi", "audio/midi"}. +{".kar", "audio/midi"}. +{".mp3", "audio/mpeg"}. +{".ogg", "audio/ogg"}. +{".m4a", "audio/x-m4a"}. +{".ra", "audio/x-realaudio"}. +{".gif", "image/gif"}. +{".jpeg", "image/jpeg"}. +{".jpg", "image/jpeg"}. +{".png", "image/png"}. +{".svg", "image/svg+xml"}. +{".svgz", "image/svg+xml"}. +{".tif", "image/tiff"}. +{".tiff", "image/tiff"}. +{".wbmp", "image/vnd.wap.wbmp"}. +{".webp", "image/webp"}. +{".ico", "image/x-icon"}. +{".jng", "image/x-jng"}. +{".bmp", "image/x-ms-bmp"}. +{".css", "text/css"}. +{".html", "text/html"}. +{".htm", "text/html"}. +{".shtml", "text/html"}. +{".mml", "text/mathml"}. +{".txt", "text/plain"}. +{".jad", "text/vnd.sun.j2me.app-descriptor"}. +{".wml", "text/vnd.wap.wml"}. +{".htc", "text/x-component"}. +{".xml", "text/xml"}. +{".3gpp", "video/3gpp"}. +{".3gp", "video/3gpp"}. +{".ts", "video/mp2t"}. +{".mp4", "video/mp4"}. +{".mpeg", "video/mpeg"}. +{".mpg", "video/mpeg"}. +{".mov", "video/quicktime"}. +{".webm", "video/webm"}. +{".flv", "video/x-flv"}. +{".m4v", "video/x-m4v"}. +{".mkv", "video/x-matroska"}. +{".mng", "video/x-mng"}. +{".asx", "video/x-ms-asf"}. +{".asf", "video/x-ms-asf"}. +{".wmv", "video/x-ms-wmv"}. +{".avi", "video/x-msvideo"}. diff --git a/storage/src/Makefile b/storage/src/Makefile new file mode 100644 index 0000000..fb62b6c --- /dev/null +++ b/storage/src/Makefile @@ -0,0 +1,19 @@ +.PHONY: all +.SUFFIXES: .erl .beam + +ERLC?= erlc -server + +ERLOPTS+= -I ../include + + +OBJS= storage.beam storage_app.beam +OBJS+= storage_supervisor.beam + +all: ${OBJS} + +.erl.beam: + ${ERLC} ${ERLOPTS} ${ERLFLAGS} $< + +clean: + rm -f *.beam + diff --git a/storage/src/storage.erl b/storage/src/storage.erl new file mode 100644 index 0000000..411998a --- /dev/null +++ b/storage/src/storage.erl @@ -0,0 +1,820 @@ +% +% Copyright (c) 2024 Andrea Biscuola +% +% Permission to use, copy, modify, and distribute this software for any +% purpose with or without fee is hereby granted, provided that the above +% copyright notice and this permission notice appear in all copies. +% +% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +% +-module(storage). +-moduledoc """ +Object storage module + +This module provide functions to interact with the storage application. + +Functionalities includes reading and writing records, creating and +deleting buckets and managing their size and replication across a +cluster. + +Objects stored in a bucket are defined as: + +``` +-record(object, {key :: term(), + value :: term(), + tags = [] :: list(term()), + metadata = [] :: list({term(), term()})}). +``` + +To import the object definition in your module, include the "storage.hrl" +file: + +``` +-include_lib("storage/include/storage.hrl"). +``` +""". + +-behaviour(gen_server). + +% Server start function +-export([start_link/0]). + +% Public bucket operation functions +-export([add_node/1, add_replica/2, add_replica/3, + create/1, create/2, create/3, + delete_replica/2, grow/1, grow/2, grow/3, + move/3, remove/1, remove/2, shrink/1, + shrink/2]). + +% Public record operation functions +-export([delete/2, list/2, read/2, write/3, write/4]). + +% Module callbacks +-export([code_change/3, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2]). + +-include_lib("stdlib/include/qlc.hrl"). +-include_lib("storage/include/storage.hrl"). + +-define(MIMEFILE, "/mimes.txt"). +-define(DEFMIME, "application/octect-stream"). + +-type object() :: #object{}. +-type metadata() :: [{atom(), term()}]. +-export_type([object/0, metadata/0]). + +-record(mime, {ext, mtype}). + +% +% Start functions. +% + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +% +% Buckets management functions. +% + +-doc """ +Add a node to the storage cluster. + +Spec: + +``` +-spec add_node(Node) -> ok | {error, Reason} when + Node :: atom(), + Reason :: term(). +``` + +Example: + +``` +add_node('b@piwa'). +ok +``` + +Bootstrap and add the node `b@piwa` to the storage cluster. +""". +-spec add_node(Node) -> ok | {error, Reason} when + Node :: atom(), + Reason :: term(). + +add_node(Node) -> + gen_server:call(?MODULE, {node, Node}). + +-doc """ +Create a replica of the given `Bucket` on `Node`. + +Spec: + +``` +-spec add_replica(Bucket, Node, Type) -> ok | {error, Reason} when + Bucket :: atom(), + Node :: atom(), + Type :: disc_copies | disc_only_copies | ram_copies, + Reason :: term(). +``` + +`Type` determines the kind of replica the destination node will manage, +be it on disc, RAM, or both. The possible values for `Type` means: + +- ram_copies: the replica will be kept only in RAM on the destination node. +- disc_copies: the replica will be in RAM and also on disc. +- disc_only_copies: the replica will reside on disc only. + +Example: + +Add a disk-only replica of the bucket "objects", on the node "pri@lab": + +``` +add_replica(objects, 'pri@lab', disc_only_copies). +ok +``` +""". +-spec add_replica(Bucket, Node, Type) -> ok | {error, Reason} when + Bucket :: atom(), + Node :: atom(), + Type :: disc_copies | disc_only_copies | ram_copies, + Reason :: term(). + +add_replica(Bucket, Node, Type) -> + gen_server:call(?MODULE, {replica, Bucket, Node, Type}). + +-doc """ +Same as: + +``` +add_replica(Bucket, Node, disc_only_copies). +``` +""". +-spec add_replica(Bucket, Node) -> ok | {error, Reason} when + Bucket :: atom(), + Node :: atom(), + Reason :: term(). + +add_replica(Bucket, Node) -> + gen_server:call(?MODULE, {replica, Bucket, Node, disc_only_copies}). + +-doc """ +Same as + +``` +create(Bucket, [node()], disc_only_copies). +``` +""". +-spec create(Bucket) -> ok | {error, Reason} when + Bucket :: atom(), + Reason :: term(). + +create(Bucket) -> + gen_server:call(?MODULE, {create, Bucket, [node()], disc_only_copies}). + +-doc """ +Same as + +``` +create(Bucket, Nodes, disc_only_copies). +``` +""". +-spec create(Bucket, Nodes) -> ok | {error, Reason} when + Bucket :: atom(), + Nodes :: [atom()], + Reason :: term(). + +create(Bucket, Nodes) -> + gen_server:call(?MODULE, {create, Bucket, Nodes, disc_only_copies}). + +-doc """ +Create a new `Bucket` on the specified list of `Nodes`. + +Spec: + +``` +-spec create(Bucket, Nodes, Type) -> ok | {error, Reason} when + Bucket :: atom(), + Nodes :: [atom()], + Type :: ram_copies | disc_copies | disc_only_copies, + Reason :: term(). +``` + +The possible values for `Type` means: + +- ram_copies: all the bucket copies will be kept only in RAM +- disc_copies: all the bucket copies will be kept in RAM and also on disc. +- disc_only_copies: all the bucket copies will be kept on disc only. + +The bucket will be created with the default size of 2GB, and a replica +will be setup between the given nodes. + +Example: + +``` +create(objects, ['lab1@pri', 'lab2@sec'], disc_only_copies). +ok +``` + +Creates the bucket "objects", of 2GB, on 'lab1@pri' and setup a replica +on 'lab2@sec'. Both the copies will be on disk only. + +If you have different needs, create a single bucket instance and then +add replicas of it calling the `add_replica/3` function. +""". +-spec create(Bucket, Nodes, Type) -> ok | {error, Reason} when + Bucket :: atom(), + Nodes :: [atom()], + Type :: ram_copies | disc_copies | disc_only_copies, + Reason :: term(). + +create(Bucket, Nodes, Type) -> + gen_server:call(?MODULE, {create, Bucket, Nodes, Type}). + +-doc """ +Delete a bucket replica. + +Spec: + +``` +-spec delete_replica(Bucket, Node) -> ok | {error, Reason} when + Bucket :: atom(), + Node :: atom(), + Reason :: term(). +``` + +Delete the `Bucket` replica from the given `Node`. +""". +-spec delete_replica(Bucket, Node) -> ok | {error, Reason} when + Bucket :: atom(), + Node :: atom(), + Reason :: term(). + +delete_replica(Bucket, Node) -> + gen_server:call(?MODULE, {delete_replica, Bucket, Node}). + +-doc """ +Same as: + +``` +grow(Bucket, [node()], 1). +``` +""". +-spec grow(Bucket) -> ok | {error, Reason} when + Bucket :: atom(), + Reason :: term(). + +grow(Bucket) -> + gen_server:call(?MODULE, {grow, Bucket, [node()], 1}). + +-doc """ +Same as: + +``` +grow(Bucket, Nodes, 1). +``` +""". +-spec grow(Bucket, Nodes) -> ok | {error, Reason} when + Bucket :: atom(), + Nodes :: [atom()], + Reason :: term(). + +grow(Bucket, Nodes) -> + gen_server:call(?MODULE, {grow, Bucket, Nodes, 1}). + +-doc """ +Add additional partitions to a bucket. + +Spec: + +``` +-spec grow(Bucket, Nodes, Num) -> ok | {error, Reason} when + Bucket :: atom(), + Nodes :: [atom()], + Num :: pos_integer(), + Reason :: term(). +``` + +`Num` is the number of partitions to add to theexisting `Bucket`. Every +added partition, will increase the bucket size by 2GB. +That is, the total size increase of the bucket will be `Num` * 2GB. + +The created partitions are going to be replicated on the given list of `Nodes`, +with priority given to nodes based on the order of the list. + +Example: + +Assuming the "objects" bucket exists: + +``` +grow(objects, ['lab1@pri', 'lab2@sec'], 4]). +ok +``` + +Adds 4 new partitions to the "objects" bucket, on 'lab1@pri', replicating +them on 'lab2@sec' with a total size increase of 4 * 2GB = 8GB. +""". +-spec grow(Bucket, Nodes, Num) -> ok | {error, Reason} when + Bucket :: atom(), + Nodes :: [atom()], + Num :: pos_integer(), + Reason :: term(). + +grow(Bucket, Nodes, Num) -> + gen_server:call(?MODULE, {grow, Bucket, Nodes, Num}). + +-doc """ +Move a bucket replica copy between nodes. + +Spec: + +``` +-spec move(Bucket, Source, Dest) -> ok | {error, Reason} when + Bucket :: atom(), + Source :: atom(), + Dest :: atom(), + Reason :: term(). +``` + +The copy of `Bucket` living on `Source`, will be moved to `Dest` and deleted +from the source node once the operation is completed. +""". +-spec move(Bucket, Source, Dest) -> ok | {error, Reason} when + Bucket :: atom(), + Source :: atom(), + Dest :: atom(), + Reason :: term(). + +move(Bucket, Source, Dest) -> + gen_server:call(?MODULE, {move, Bucket, Source, Dest}). + +-doc """ +Delete the given `Bucket` and all it's replicas. + +Spec: + +``` +-spec remove(Bucket) -> ok | {error, Reason} when + Bucket :: atom(), + Reason :: term(). +``` +""". +-spec remove(Bucket) -> ok | {error, Reason} when + Bucket :: atom(), + Reason :: term(). + +remove(Bucket) -> + gen_server:call(?MODULE, {remove, Bucket}). + +-doc """ +Delete the replica of the given `Bucket` from Node. + +Spec: + +``` +-spec remove(Bucket, Node) -> ok | {error, Reason} when + Bucket :: atom(), + Node :: atom(), + Reason :: term(). +``` +""". +-spec remove(Bucket, Node) -> ok | {error, Reason} when + Bucket :: atom(), + Node :: atom(), + Reason :: term(). + +remove(Bucket, Node) -> + gen_server:call(?MODULE, {remove, Bucket, Node}). + +-doc """ +Same as: + +``` +shrink(Bucket, 1). +""". +-spec shrink(Bucket) -> ok | {error, Reason} when + Bucket :: atom(), + Reason :: term(). + +shrink(Bucket) -> + gen_server:call(?MODULE, {shrink, Bucket, 1}). + +-doc """ +Delete bucket partitions. + +Spec: + +``` +-spec shrink(Bucket, Num) -> ok | {error, Reason} when + Bucket :: atom(), + Num :: pos_integer(), + Reason :: term(). +``` + +Remove `Num` partitions from the given `Bucket`. Objects stored +on the deleted partitions are moved to the remaining ones automatically. + +Removing partitions will shrink the total bucket size of 2GB * `Num`. + +Example: + +``` +shrink(objects, 6). +ok +``` + +Removes 6 partitions from the "objects" bucket, reducing it's size by +2GB * 6 = 12GB. +""". +-spec shrink(Bucket, Num) -> ok | {error, Reason} when + Bucket :: atom(), + Num :: pos_integer(), + Reason :: term(). + +shrink(Bucket, Num) -> + gen_server:call(?MODULE, {shrink, Bucket, Num}). + +% +% Record operation functions. +% + +-doc """ +Delete an object from a bucket. + +Spec: + +``` +-spec delete(Bucket, Key) -> ok | {error, Reason} when + Bucket :: atom(), + Key :: term(), + Reason :: term(). +``` + +""". +-spec delete(Bucket, Key) -> ok | {error, Reason} when + Bucket :: atom(), + Key :: term(), + Reason :: term(). + +delete(Bucket, Key) -> + gen_server:call(?MODULE, {delete, Bucket, Key}). + +-doc """ +Spec: + +``` +-spec list(Bucket, Metadata) -> {ok, Records} when + Bucket :: atom(), + Metadata :: metadata(), + Records :: [object()]. +``` +Read a list of objects from a bucket. + +Returned objects are the ones tagged with `all` the tags in the tags +list of it's metadata field. + +Example: + +``` +{ok, Objs} = list(objects, [{tags, [foo, bar]}]). +Objs. +[#object{key = o1, + tags = [foo, bar], + value = {ipse, dixit}, + metadata = [{tags, [foo, bar]},{mime, 'application/octect-stream'}] + }, #object{key = o2, + tags = [foo, bar], + value = 42, + metadata = [{tags, [foo, bar]},{mime, 'application/octect-stream'}] + } +] + +""". +-spec list(Bucket, Metadata) -> {ok, Records} when + Bucket :: atom(), + Metadata :: metadata(), + Records :: [object()]. + +list(Bucket, Metadata) -> + gen_server:call(?MODULE, {list, Bucket, Metadata}). + +-doc """ +Read an object from a bucket. + +Spec: + +``` +-spec read(Bucket, Key) -> {ok, Record} | {error, Reason} when + Bucket :: atom(), + Key :: term(), + Record :: [object()], + Reason :: term(). +``` + +Example: + +``` +{ok, [R]} = read(objects, foo). +R. +#object{key = o1,value = {ipse, dixit}, + metadata = [{tags, [foo, bar]},{mime, 'application/octet-stream'}]} +``` +""". +-spec read(Bucket, Key) -> {ok, Record} | {error, Reason} when + Bucket :: atom(), + Key :: term(), + Record :: [object()], + Reason :: term(). + +read(Bucket, Key) -> + gen_server:call(?MODULE, {read, Bucket, Key}). + +-doc """ +Same as: + +``` +write(Bucket, Key, Term, []). +``` + +""". +-spec write(Bucket, Key, Term) -> ok | {error, Reason} when + Bucket :: atom(), + Key :: term(), + Term :: term(), + Reason :: term(). + +write(Bucket, Key, Term) -> + gen_server:call(?MODULE, {write, Bucket, Key, Term}). + +-doc """ +Store an object in a provided bucket. + +Spec: + +``` +-spec write(Bucket, Key, Term, Metadata) -> ok | {error, Reason} when + Bucket :: atom(), + Key :: term(), + Term :: term(), + Metadata :: metadata(), + Reason :: term(). +``` + +Store an object identified by `Key` with value `Term` in `Bucket`. The +key must be unique and writing an object with the same key as an +existing one will cause an update of the existing object. + +`Metadata` represent additional data attached to the stored object, +useful when later searching for objects using `list/2`. + +Example: + +``` +write(objects, hello, world, [{tags, [blog, post]}]). +ok +``` + +Store an object identified by the key "hello" in the "objects" bucket, +tagged with the atoms `blog` and `post` in it's metadata. +""". +-spec write(Bucket, Key, Term, Metadata) -> ok | {error, Reason} when + Bucket :: atom(), + Key :: term(), + Term :: term(), + Metadata :: metadata(), + Reason :: term(). + +write(Bucket, Key, Term, Metadata) -> + gen_server:call(?MODULE, {write, Bucket, Key, Term, Metadata}). + +% +% Module callbacks. +% + +code_change(_OldVsn, N, _Extra) -> {ok, N}. + +init([]) -> + process_flag(trap_exit, true), + + mnesia:create_table(mime, [{attributes, record_info(fields, mime)}, + {ram_copies, [node()]}, {local_content, true}]), + + F= fun() -> + {ok, App} = application:get_application(?MODULE), + {ok, M} = file:consult(lists:append(code:priv_dir(App), ?MIMEFILE)), + + [mnesia:write(#mime{ext = E, mtype = T}) || {E, T} <- M] + end, + + mnesia:transaction(F), + + {ok, 0}. + +terminate(_Reason, _N) -> + mnesia:delete_table(mime), + ok. + +% +% Bucket operations callbacks. +% + +handle_call({node, Node}, _From, State) -> + ok = rpc:call(Node, mnesia, start, []), + {ok, _} = mnesia:change_config(extra_db_nodes, [Node]), + case mnesia:change_table_copy_type(schema, Node, disc_copies) of + {atomic, ok} -> {reply, ok, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({create, Bucket, Nodes, Type}, _From, State) -> + mnesia:create_table(Bucket, [{attributes, record_info(fields, object)}, + {Type, Nodes}, {record_name, object}]), + + case mnesia:change_table_frag(Bucket, {activate, []}) of + {atomic, ok} -> {reply, ok, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({delete_replica, Bucket, Node}, _From, State) -> + case mnesia:del_table_copy(Bucket, Node) of + {atomic, _} -> {reply, ok, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({grow, Bucket, Nodes, Num}, _From, State) -> + case grow_bucket(Bucket, Nodes, Num) of + ok -> {reply, ok, State}; + {error, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({move, Bucket, Source, Dest}, _From, State) -> + case mnesia:move_table_copy(Bucket, Source, Dest) of + {atomic, ok} -> {reply, ok, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({remove, Bucket}, _From, State) -> + case mnesia:delete_table(Bucket) of + {atomic, ok} -> {reply, ok, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({remove, Bucket, Node}, _From, State) -> + case mnesia:del_table_copy(Bucket, Node) of + {atomic, ok} -> {reply, ok, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({replica, Bucket, Node, Type}, _From, State) -> + case mnesia:add_table_copy(Bucket, Node, Type) of + {atomic, _} -> {reply, ok, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({shrink, Bucket, Num}, _From, State) -> + case shrink_bucket(Bucket, Num) of + ok -> {reply, ok, State}; + {error, Reason} -> {reply, {error, Reason}, State} + end; + +% +% Record operation callbacks. +% + +% +% Read callbacks. +% + +handle_call({list, Bucket, Metadata}, _From, State) -> + case search(Bucket, Metadata) of + {ok, Records} -> {reply, {ok, Records}, State}; + {error, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({read, Bucket, Key}, _From, State) -> + F = fun() -> + mnesia:read(Bucket, Key) + end, + + case mnesia:transaction(F) of + {atomic, R} -> {reply, {ok, R}, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end; + +% +% Write callbacks. +% + +handle_call({delete, Bucket, Key}, _From, State) -> + F = fun() -> + mnesia:delete({Bucket, Key}) + end, + + case mnesia:transaction(F) of + {atomic, ok} -> {reply, ok, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end; + +handle_call({write, Bucket, Key, Term}, _From, State) -> + handle_call({write, Bucket, Key, Term, []}, _From, State); + +handle_call({write, Bucket, Key, Term, Metadata}, _From, State) -> + Ex = filename:extension(Key), + + F = fun() -> + M = case mnesia:read(mime, Ex) of + [] -> ?DEFMIME; + [Mt] -> Mt#mime.mtype + end, + + R = #object{key = Key, value = Term, metadata = lists:flatten([{mime, M}, Metadata])}, + mnesia:write(Bucket, R, write) + end, + + case mnesia:transaction(F) of + {atomic, _} -> {reply, ok, State}; + {aborted, Reason} -> {reply, {error, Reason}, State} + end. + +handle_cast(_Msg, State) -> {noreply, State}. + +handle_info(_Info, State) -> {noreply, State}. + +% +% Private functions. +% + +-spec grow_bucket(Bucket, Nodes, Num) -> ok | {error, Reason} when + Bucket :: atom(), + Nodes :: [atom()], + Num :: pos_integer(), + Reason :: term(). + +grow_bucket(_Bucket, _Nodes, 0) -> ok; +grow_bucket(Bucket, Nodes, Num) when is_integer(Num) -> + case mnesia:change_table_frag(Bucket, {add_frag, Nodes}) of + {atomic, ok} -> + grow_bucket(Bucket, Nodes, Num - 1); + {aborted, Reason} -> + {error, Reason} + end. + +-spec shrink_bucket(Bucket, Num) -> ok | {error, Reason} when + Bucket :: atom(), + Num :: pos_integer(), + Reason :: term(). + +shrink_bucket(_Bucket, 0) -> ok; +shrink_bucket(Bucket, Num) when is_integer(Num) -> + case mnesia:change_table_frag(Bucket, del_frag) of + {atomic, ok} -> + shrink_bucket(Bucket, Num - 1); + {aborted, Reason} -> + {error, Reason} + end. + +-spec search(Bucket, Metadata) -> {ok, Objects} | {error, Reason} when + Bucket :: atom(), + Metadata :: metadata(), + Objects :: [object()], + Reason :: term(). + +search(Bucket, Metadata) -> + Q = qlc:q([X || X <- mnesia:table(Bucket), filter(Metadata, X#object.metadata)]), + + F = fun() -> + qlc:e(Q) + end, + + case mnesia:transaction(F) of + {atomic, L} -> {ok, L}; + {aborted, Reason} -> {error, Reason} + end. + +% +% A record is reported if and only if it contains all the key-value +% pairs passed in the M list. +% +% As a nice side effect, passing an empty metadata list `[]`, return +% the record itself. Useful to list all ther records in a specific table. +% +filter([], _M) -> true; +filter([H | T], M) -> + {K, V} = H, + + case lists:keyfind(K, 1, M) of + false -> false; + {_Key, Val} -> + if + Val =:= V -> filter(T, M); + true -> false + end + end. \ No newline at end of file diff --git a/storage/src/storage_app.erl b/storage/src/storage_app.erl new file mode 100644 index 0000000..60bef8f --- /dev/null +++ b/storage/src/storage_app.erl @@ -0,0 +1,25 @@ +% +% Copyright (c) 2024 Andrea Biscuola +% +% Permission to use, copy, modify, and distribute this software for any +% purpose with or without fee is hereby granted, provided that the above +% copyright notice and this permission notice appear in all copies. +% +% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +% +-module(storage_app). +-behaviour(application). + +-export([start/2, + stop/1]). + +start(_Type, StartArgs) -> + storage_supervisor:start_link(StartArgs). + +stop(_State) -> ok. \ No newline at end of file diff --git a/storage/src/storage_supervisor.erl b/storage/src/storage_supervisor.erl new file mode 100644 index 0000000..4aaed8f --- /dev/null +++ b/storage/src/storage_supervisor.erl @@ -0,0 +1,31 @@ +% +% Copyright (c) 2024 Andrea Biscuola +% +% Permission to use, copy, modify, and distribute this software for any +% purpose with or without fee is hereby granted, provided that the above +% copyright notice and this permission notice appear in all copies. +% +% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +% +-module(storage_supervisor). +-behaviour(supervisor). + +-export([start/0, + start_link/1, + init/1]). + +start() -> + spawn(fun() -> supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []) end). + +start_link(Args) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, Args). + +init([]) -> + {ok, {{one_for_one, 3, 10}, [{tag1, {storage, start_link, []}, permanent, + 10000, worker, [storage]}]}}. \ No newline at end of file