diff --git a/.prettierrc.js b/.prettierrc.js index 6e8b7f6..e617454 100644 --- a/.prettierrc.js +++ b/.prettierrc.js @@ -1,6 +1,6 @@ module.exports = { trailingComma: "es5", tabWidth: 2, - semi: false, + semi: true, singleQuote: false, -} +}; diff --git a/lib/ckd/application.ex b/lib/ckd/application.ex index 9d486dc..71a7856 100644 --- a/lib/ckd/application.ex +++ b/lib/ckd/application.ex @@ -6,6 +6,10 @@ defmodule Ckd.Application do use Application @dero_wallet_worker_registry :dero_wallet_worker_registry + @dero_queue_worker_registry :dero_queue_worker_registry + @dero_transfer_worker_registry :dero_transfer_worker_registry + @m_worker_registry :m_worker_registry + @rtbl_worker_registry :rtbl_worker_registry def start(_type, _args) do children = [ @@ -18,8 +22,16 @@ defmodule Ckd.Application do # State store {Ckd.State, [statefile: statefile_path()]}, # Services - Ckd.DeroWallet, {Registry, [keys: :unique, name: @dero_wallet_worker_registry]}, + {Registry, [keys: :unique, name: @dero_transfer_worker_registry]}, + {Registry, [keys: :unique, name: @dero_queue_worker_registry]}, + {Registry, [keys: :unique, name: @m_worker_registry]}, + {Registry, [keys: :unique, name: @rtbl_worker_registry]}, + Ckd.DeroWallet, + Ckd.DeroTransfer, + Ckd.DeroQueue, + Ckd.M, + Ckd.Rtbl, # Start the Endpoint (http/https) CkdWeb.Endpoint # Start a worker by calling: Ckd.Worker.start_link(arg) @@ -42,6 +54,18 @@ defmodule Ckd.Application do @spec dero_wallet_worker_registry :: :dero_wallet_worker_registry def dero_wallet_worker_registry, do: @dero_wallet_worker_registry + @spec dero_queue_worker_registry :: :dero_queue_worker_registry + def dero_queue_worker_registry, do: @dero_queue_worker_registry + + @spec dero_transfer_worker_registry :: :dero_transfer_worker_registry + def dero_transfer_worker_registry, do: @dero_transfer_worker_registry + + @spec m_worker_registry :: :m_worker_registry + def m_worker_registry, do: @m_worker_registry + + @spec rtbl_worker_registry :: :rtbl_worker_registry + def rtbl_worker_registry, do: @rtbl_worker_registry + # Tell Phoenix to update the endpoint configuration # whenever the application is updated. def config_change(changed, _new, removed) do diff --git a/lib/ckd/dero_rpc.ex b/lib/ckd/dero_rpc.ex deleted file mode 100644 index 88ccb68..0000000 --- a/lib/ckd/dero_rpc.ex +++ /dev/null @@ -1,102 +0,0 @@ -defmodule Ckd.DeroRPC do - @moduledoc false - - @spec get_address(%{ - :wallet_pass => binary, - :wallet_url => binary, - :wallet_user => binary, - optional(any) => any - }) :: - {:error, any} - | {:ok, any} - def get_address( - dero = %{wallet_url: wallet_url, wallet_user: wallet_user, wallet_pass: wallet_pass} - ) - when is_binary(wallet_url) and is_binary(wallet_user) and is_binary(wallet_pass) do - request(wallet_creds(dero), %{method: "GetAddress"}) - end - - @spec get_height(%{ - :wallet_pass => binary, - :wallet_url => binary, - :wallet_user => binary, - optional(any) => any - }) :: - {:error, any} - | {:ok, any} - def get_height( - dero = %{wallet_url: wallet_url, wallet_user: wallet_user, wallet_pass: wallet_pass} - ) - when is_binary(wallet_url) and is_binary(wallet_user) and is_binary(wallet_pass) do - request(wallet_creds(dero), %{method: "GetHeight"}) - end - - @spec get_balance(%{ - :wallet_pass => binary, - :wallet_url => binary, - :wallet_user => binary, - optional(any) => any - }) :: - {:error, any} - | {:ok, any} - def get_balance( - dero = %{wallet_url: wallet_url, wallet_user: wallet_user, wallet_pass: wallet_pass} - ) - when is_binary(wallet_url) and is_binary(wallet_user) and is_binary(wallet_pass) do - request(wallet_creds(dero), %{method: "GetBalance"}) - end - - def get_transfers( - dero = %{wallet_url: wallet_url, wallet_user: wallet_user, wallet_pass: wallet_pass}, - min_height: min_height - ) - when is_binary(wallet_url) and is_binary(wallet_user) and is_binary(wallet_pass) and - is_number(min_height) do - request(wallet_creds(dero), %{ - method: "GetTransfers", - params: %{ - in: true, - out: true, - min_height: min_height, - max_height: 0 - } - }) - end - - defp request(creds = {base_url, user, pass}, body = %{method: method}) - when is_binary(base_url) and is_binary(user) and is_binary(pass) and is_binary(method) and - is_map(body) do - request = Map.merge(%{jsonrpc: "2.0", id: "1"}, body) - - case Tesla.post(client(creds), "/json_rpc", request) do - {:ok, %{body: %{"result" => result}}} -> - {:ok, result} - - {:ok, - %{body: %{"error" => %{"code" => -32_098, "message" => message = "Account Unregistered"}}}} -> - {:error, code: :account_unregistered, message: message} - - {:error, message} -> - {:error, message} - end - end - - defp client({base_url, user, pass}) - when is_binary(base_url) and is_binary(user) and is_binary(pass) do - middleware = [ - {Tesla.Middleware.BaseUrl, base_url}, - {Tesla.Middleware.BasicAuth, username: user, password: pass}, - Tesla.Middleware.JSON - # Tesla.Middleware.Logger - ] - - Tesla.client(middleware) - end - - defp wallet_creds( - _dero = %{wallet_url: wallet_url, wallet_user: wallet_user, wallet_pass: wallet_pass} - ) - when is_binary(wallet_url) and is_binary(wallet_user) and is_binary(wallet_pass) do - {wallet_url, wallet_user, wallet_pass} - end -end diff --git a/lib/ckd/dero_wallet_worker.ex b/lib/ckd/dero_wallet_worker.ex deleted file mode 100644 index 3c94f16..0000000 --- a/lib/ckd/dero_wallet_worker.ex +++ /dev/null @@ -1,111 +0,0 @@ -defmodule Ckd.DeroWalletWorker do - @moduledoc false - - use GenServer - require Logger - - @registry Ckd.Application.dero_wallet_worker_registry() - @fetch_info_interval 1_000 - @dero_divisor 100_000 - - def start_link(aid) when is_binary(aid), - do: GenServer.start_link(__MODULE__, aid, name: via_tuple(aid)) - - def init(aid) when is_binary(aid) do - {:ok, _existing_account = %{}} = Ckd.State.subscribe_info({:account, aid}) - {:ok, %{aid: aid, subs: [], dero: nil, info: nil}} - end - - def handle_info({{:account, _aid}, nil}, _state), - do: Process.exit(self(), :normal) - - def handle_info( - {{:account, aid}, %{dero: same_dero}}, - state = %{aid: aid, dero: same_dero} - ), - do: {:no_reply, state} - - def handle_info( - {{:account, aid}, %{dero: new_dero}}, - state = %{aid: aid} - ) do - send(self(), {:fetch_info, dero: new_dero}) - {:noreply, Map.put(state, :dero, new_dero)} - end - - def handle_info( - msg = {:fetch_info, dero: dero}, - state = %{aid: aid, dero: dero, info: old_info, subs: subs} - ) do - with {:ok, %{"address" => address}} <- Ckd.DeroRPC.get_address(dero), - {:ok, %{"height" => height}} <- Ckd.DeroRPC.get_height(dero), - {:ok, %{"balance" => balance, "unlocked_balance" => unlocked_balance}} <- - Ckd.DeroRPC.get_balance(dero) do - min_height = calc_min_height(old_info) - - new_transfers = - case Ckd.DeroRPC.get_transfers(dero, min_height: min_height) do - {:ok, %{"entries" => new_transfers}} -> new_transfers - {:ok, _} -> [] - end - - info = %{ - address: address, - height: height, - balance: balance / @dero_divisor, - unlocked_balance: unlocked_balance / @dero_divisor, - transfers: - case old_info do - nil -> new_transfers - %{transfers: old_transfers} -> old_transfers ++ new_transfers - end - } - - if info != old_info do - Enum.each(subs, fn {pid, info_key = {:dero_wallet, ^aid}} -> - send(pid, {info_key, info}) - end) - end - - Process.send_after(self(), msg, @fetch_info_interval) - - {:noreply, Map.put(state, :info, info)} - else - {:error, message} -> - Logger.error(message) - Process.send_after(self(), msg, @fetch_info_interval * 10) - {:noreply, state} - end - end - - def handle_call({:fetch_info, dero: _stale_dero}, _from, state), do: {:noreply, state} - - def handle_call( - {:subscribe_info, info_key = {:dero_wallet, aid}}, - {from_pid, _}, - state = %{aid: aid, subs: subs} - ) - when is_binary(aid) and is_list(subs) do - new_state = Map.put(state, :subs, subs ++ [{from_pid, info_key}]) - send(from_pid, {info_key, new_state.info}) - {:reply, {:ok, new_state.info}, new_state} - end - - def handle_call( - {:unsubscribe_info, info_key = {:dero_wallet, aid}}, - {from_pid, _}, - state = %{aid: aid, subs: subs} - ) - when is_binary(aid) and is_list(subs) do - {:reply, :ok, - Map.put(state, :subs, Enum.reject(subs, fn sub -> sub == {from_pid, info_key} end))} - end - - defp calc_min_height(_old_info = %{transfers: transfers}), - do: Enum.map(transfers, fn t -> t["height"] + 1 end) |> Enum.max(fn -> 0 end) - - defp calc_min_height(_old_info = nil), do: 0 - - defp via_tuple(aid) when is_binary(aid), - do: {:via, Registry, {@registry, aid}} -end diff --git a/lib/ckd/services/dero/dero_queue.ex b/lib/ckd/services/dero/dero_queue.ex new file mode 100644 index 0000000..ad92067 --- /dev/null +++ b/lib/ckd/services/dero/dero_queue.ex @@ -0,0 +1,96 @@ +defmodule Ckd.DeroQueue do + @moduledoc false + + use DynamicSupervisor + + alias Ckd.DeroQueueWorker + + @spec start_link(any) :: :ignore | {:error, any} | {:ok, pid} + def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) + + @impl true + @spec init(any) :: + {:ok, + %{ + extra_arguments: list, + intensity: non_neg_integer, + max_children: :infinity | non_neg_integer, + period: pos_integer, + strategy: :one_for_one + }} + def init(_), do: DynamicSupervisor.init(strategy: :one_for_one) + + @spec subscribe_info( + {:dero_queue, binary} + | {:dero_queue_item, [{:aid, binary} | {:qid, binary}, ...]} + | {:dero_queue_pending_item, binary} + ) :: any + def subscribe_info(key = {:dero_queue, aid}) when is_binary(aid), + do: GenServer.call(worker_pid(aid), {:subscribe_info, key}) + + def subscribe_info(key = {:dero_queue_item, aid: aid, qid: qid}) + when is_binary(aid) and is_binary(qid), + do: GenServer.call(worker_pid(aid), {:subscribe_info, key}) + + def subscribe_info(key = {:dero_queue_pending_item, aid}) when is_binary(aid), + do: GenServer.call(worker_pid(aid), {:subscribe_info, key}) + + @spec unsubscribe_info( + {:dero_queue, binary} + | {:dero_queue_item, [{:aid, binary} | {:qid, binary}, ...]} + | {:dero_queue_pending_item, binary} + ) :: any + def unsubscribe_info(key = {:dero_queue, aid}) when is_binary(aid), + do: GenServer.call(worker_pid(aid), {:unsubscribe_info, key}) + + def unsubscribe_info(key = {:dero_queue_item, aid: aid, qid: qid}) + when is_binary(aid) and is_binary(qid), + do: GenServer.call(worker_pid(aid), {:unsubscribe_info, key}) + + def unsubscribe_info(key = {:dero_queue_pending_item, aid}) when is_binary(aid), + do: GenServer.call(worker_pid(aid), {:unsubscribe_info, key}) + + @spec read( + {:dero_queue, binary} + | {:dero_queue_item, [{:aid, binary} | {:qid, binary}, ...]} + | {:dero_queue_pending_item, binary} + ) :: any + def read(key = {:dero_queue, aid}) when is_binary(aid), + do: GenServer.call(worker_pid(aid), {:read, key}) + + def read(key = {:dero_queue_item, aid: aid, qid: qid}) + when is_binary(aid) and is_binary(qid), + do: GenServer.call(worker_pid(aid), {:read, key}) + + def read(key = {:dero_queue_pending_item, aid}) when is_binary(aid), + do: GenServer.call(worker_pid(aid), {:read, key}) + + def queue_action(aid, action = {name, opts}) + when is_binary(aid) and is_atom(name) and is_map(opts), + do: GenServer.call(worker_pid(aid), {:queue_action, aid, action}) + + @spec authorize_action([{:aid, binary} | {:qid, binary}, ...]) :: any + def authorize_action(aid: aid, qid: qid) when is_binary(aid) and is_binary(qid), + do: GenServer.call(worker_pid(aid), {:authorize_action, qid}) + + @spec cancel_action([{:aid, binary} | {:qid, binary}, ...]) :: any + def cancel_action(aid: aid, qid: qid) when is_binary(aid) and is_binary(qid), + do: GenServer.call(worker_pid(aid), {:cancel_action, qid}) + + defp worker_pid(aid) when is_binary(aid), + do: + (case(maybe_worker(aid)) do + [{pid, _}] -> + pid + + [] -> + {:ok, pid} = start_worker(aid) + pid + end) + + defp maybe_worker(aid) when is_binary(aid), + do: Registry.lookup(Ckd.Application.dero_queue_worker_registry(), aid) + + defp start_worker(aid) when is_binary(aid), + do: DynamicSupervisor.start_child(__MODULE__, {DeroQueueWorker, aid}) +end diff --git a/lib/ckd/services/dero/dero_queue_worker.ex b/lib/ckd/services/dero/dero_queue_worker.ex new file mode 100644 index 0000000..2b6fc18 --- /dev/null +++ b/lib/ckd/services/dero/dero_queue_worker.ex @@ -0,0 +1,184 @@ +defmodule Ckd.DeroQueueWorker do + @moduledoc false + + use GenServer + require Logger + + @registry Ckd.Application.dero_queue_worker_registry() + @pending_statuses [:awaiting_authorization, :processing] + + def start_link(aid) when is_binary(aid), + do: GenServer.start_link(__MODULE__, aid, name: via_tuple(aid)) + + def init(aid) when is_binary(aid) do + {:ok, %{aid: aid, subs: [], info: []}} + end + + def handle_call({:subscribe_info, info_key}, {from_pid, _}, state = %{aid: aid, subs: subs}) + when is_binary(aid) and is_list(subs) do + new_state = Map.put(state, :subs, subs ++ [{from_pid, info_key}]) + key_info = to_info(info_key, new_state.info) + send(from_pid, {info_key, key_info}) + {:reply, key_info, new_state} + end + + def handle_call({:unsubscribe_info, info_key}, {from_pid, _}, state = %{aid: aid, subs: subs}) + when is_binary(aid) and is_list(subs) do + {:reply, :ok, + Map.put(state, :subs, Enum.reject(subs, fn sub -> sub == {from_pid, info_key} end))} + end + + def handle_call({:read, info_key}, _from, state = %{aid: aid}) + when is_binary(aid) do + {:reply, to_info(info_key, state.info), state} + end + + def handle_call( + {:queue_action, aid, action = {name, opts}}, + _from, + state_was = %{aid: aid, info: info_was, subs: subs} + ) + when is_binary(aid) and is_atom(name) and is_map(opts) and is_list(info_was) do + if get_pending_action(info_was) == nil do + new_item = %{ + qid: Ksuid.generate(), + action: action, + status: :awaiting_authorization, + txid: nil + } + + new_state = Map.put(state_was, :info, [new_item | info_was]) + + Enum.each(subs, fn {pid, info_key} -> + send(pid, {info_key, to_info(info_key, new_state.info)}) + end) + + {:reply, {:ok, new_item}, new_state} + else + {:reply, {:error, "action already pending"}, state_was} + end + end + + def handle_call( + {:authorize_action, qid}, + _from, + state = %{ + aid: aid, + subs: subs, + info: + _info_was = [ + awaiting_authorization_item = %{ + qid: qid, + action: {:m_give, %{dero_i: dero_i, scid: scid, anon?: anon?}}, + status: :awaiting_authorization + } + | info_tail + ] + } + ) + when is_binary(aid) and is_binary(qid) and is_integer(dero_i) and is_boolean(anon?) and + is_binary(scid) and is_list(subs) do + %{dero: account_dero} = Ckd.State.read({:account, aid}) + wallet_client = Ckd.DeroRPC.wallet_client(account_dero) + + # TODO: handle {:error, message} + {:ok, txid} = Ckd.DeroRPC.m_give(wallet_client, scid: scid, dero_i: dero_i, anon?: anon?) + + Ckd.DeroTransfer.subscribe_info({:dero_transfer, aid: aid, txid: txid}) + + processing_item = + awaiting_authorization_item |> Map.put(:txid, txid) |> Map.put(:status, :processing) + + new_state = Map.put(state, :info, [processing_item | info_tail]) + + Enum.each(subs, fn {pid, info_key} -> + send(pid, {info_key, to_info(info_key, new_state.info)}) + end) + + {:reply, :ok, new_state} + end + + def handle_call( + {:cancel_action, qid}, + _from, + state = %{ + aid: aid, + subs: subs, + info: + _info_was = [ + _awaiting_authorization_item = %{ + qid: qid, + action: _action, + status: :awaiting_authorization + } + | info_tail + ] + } + ) + when is_binary(aid) and is_binary(qid) do + new_state = Map.put(state, :info, info_tail) + + Enum.each(subs, fn {pid, info_key} -> + send(pid, {info_key, to_info(info_key, new_state.info)}) + end) + + {:reply, :ok, new_state} + end + + def handle_info({{:dero_transfer, aid: _aid, txid: _txid}, {:error, _}}, state), + do: {:noreply, state} + + def handle_info( + {{:dero_transfer, aid: _aid, txid: txid}, {:ok, _}}, + state = %{info: info_was, subs: subs} + ) + when is_binary(txid) and is_list(info_was) and is_list(subs) do + new_state = + Map.put( + state, + :info, + Enum.map(info_was, fn i -> + if i.txid == txid do + i |> Map.put(:status, :completed) + else + i + end + end) + ) + + Enum.each(subs, fn {pid, info_key} -> + send(pid, {info_key, to_info(info_key, new_state.info)}) + end) + + {:noreply, new_state} + end + + defp get_pending_action(info) when is_list(info) do + Enum.find(info, fn i -> + case i.status do + s when s in @pending_statuses -> true + _ -> false + end + end) + end + + defp to_info({:dero_queue, aid}, info) when is_binary(aid) and is_list(info) do + info + end + + # TODO! + defp to_info({:dero_queue_item, aid: aid, qid: qid}, info) + when is_binary(aid) and is_binary(qid) and is_list(info) do + Enum.find(info, fn i -> + i.qid == qid + end) + end + + defp to_info({:dero_queue_pending_item, aid}, info) + when is_binary(aid) and is_list(info) do + get_pending_action(info) + end + + defp via_tuple(aid) when is_binary(aid), + do: {:via, Registry, {@registry, aid}} +end diff --git a/lib/ckd/services/dero/dero_rpc.ex b/lib/ckd/services/dero/dero_rpc.ex new file mode 100644 index 0000000..f99a0f8 --- /dev/null +++ b/lib/ckd/services/dero/dero_rpc.ex @@ -0,0 +1,525 @@ +defmodule Ckd.DeroRPC do + @moduledoc false + require Logger + + @dero_divisor 100_000 + @tesla_timeout 30_000 + + @spec get_address(Tesla.Client.t()) :: {:error, any} | {:ok, any} + def get_address(client = %Tesla.Client{}) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "GetAddress" + }) + end + + @spec get_height(Tesla.Client.t()) :: {:error, any} | {:ok, any} + def get_height(client = %Tesla.Client{}) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "GetHeight" + }) + end + + @spec get_balance(Tesla.Client.t()) :: {:error, any} | {:ok, any} + def get_balance(client = %Tesla.Client{}) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "GetBalance" + }) + end + + @spec get_sc_balance(Tesla.Client.t(), [{:scid, binary}, ...]) :: + {:error, any} | {:ok, %{balance: %{token_f: float}}} + def get_sc_balance(client = %Tesla.Client{}, scid: scid) when is_binary(scid) do + case request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "GetBalance", + params: %{scid: scid} + }) do + {:ok, %{"balance" => balance}} -> + {:ok, %{balance: %{token_f: balance / @dero_divisor}}} + + e = {:error, _} -> + e + end + end + + @spec get_transfer(Tesla.Client.t(), [{:txid, binary}, ...]) :: {:error, any} | {:ok, any} + def get_transfer(client = %Tesla.Client{}, txid: txid) when is_binary(txid) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "GetTransferbyTXID", + params: %{txid: txid} + }) + end + + # def get_transfers(client = %Tesla.Client{}, min_height: min_height) + # when is_number(min_height) do + # request(client, %{ + # method: "GetTransfers", + # params: %{ + # in: true, + # out: true, + # min_height: min_height, + # max_height: 0 + # } + # }) + # end + + def get_sc(client = %Tesla.Client{}, scid: scid) when is_binary(scid) do + case request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "DERO.GetSC", + params: %{ + scid: scid, + code: true, + variables: true + } + }) do + {:ok, + %{ + "stringkeys" => stringkeys, + # "balances" => balances, + "balance" => balance, + "code" => code + }} -> + {:ok, + %{ + stringkeys: + for( + {k, v} <- stringkeys, + into: %{}, + do: + {k, + case v do + v when is_binary(v) -> Base.decode16!(v, case: :lower) + v -> v + end} + ), + # balances: balances, + balance: %{dero_f: balance / @dero_divisor}, + code: code + }} + + {:ok, %{"balance" => 0, "code" => "", "status" => "OK"}} -> + {:error, "contract not found"} + + e = {:error, _} -> + e + end + end + + @spec m_give(Tesla.Client.t(), [ + {:dero_i, integer} | {:scid, binary} | {:anon?, boolean}, + ... + ]) :: {:error, any} | {:ok, any} + def m_give(client = %Tesla.Client{}, scid: scid, dero_i: dero_i, anon?: anon?) + when is_binary(scid) and is_integer(dero_i) and is_boolean(anon?) do + {entrypoint, ringsize} = + case anon? do + false -> {:G, 2} + true -> {:G2, 64} + end + + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: ringsize, + sc_dero_deposit: dero_i, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: entrypoint} + ] + } + }) + |> handle_txid_response() + end + + def m_set_info(client = %Tesla.Client{}, + scid: scid, + name: name, + ticker: ticker, + purpose: purpose, + website: website + ) + when is_binary(scid) and is_binary(name) and is_binary(ticker) and is_binary(purpose) and + is_binary(website) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :SetInfo}, + %{name: :name, datatype: :S, value: name}, + %{name: :ticker, datatype: :S, value: ticker}, + %{name: :purpose, datatype: :S, value: purpose}, + %{name: :website, datatype: :S, value: website} + ] + } + }) + |> handle_txid_response() + end + + def m_initiate_withdraw(client = %Tesla.Client{}, + scid: scid, + amount: amount, + dest: dest + ) + when is_binary(scid) and is_integer(amount) and is_binary(dest) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :InitiateWithdraw}, + %{name: :amount, datatype: :U, value: amount}, + %{name: :dest, datatype: :S, value: dest} + ] + } + }) + |> handle_txid_response() + end + + def m_withdraw(client = %Tesla.Client{}, + scid: scid, + amount: amount, + dest: dest + ) + when is_binary(scid) and is_integer(amount) and is_binary(dest) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :Withdraw}, + %{name: :amount, datatype: :U, value: amount}, + %{name: :dest, datatype: :S, value: dest} + ] + } + }) + |> handle_txid_response() + end + + def m_initiate_update_code(client = %Tesla.Client{}, scid: scid, code: code) + when is_binary(scid) and is_binary(code) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :InitiateUpdateCode}, + %{name: :code, datatype: :S, value: code} + ] + } + }) + |> handle_txid_response() + end + + def m_update_code( + daemon_client: daemon_client = %Tesla.Client{}, + wallet_client: wallet_client = %Tesla.Client{}, + scid: scid, + code: code + ) + when is_binary(scid) and is_binary(code) do + {:ok, %{"address" => address}} = get_address(wallet_client) + + {:ok, estimate = %{"gasstorage" => fees}} = + request(daemon_client, %{ + jsonrpc: "2.0", + id: "1", + method: "DERO.GetGasEstimate", + params: %{ + sc_rpc: [ + %{name: :SC_ACTION, datatype: :U, value: 0}, + %{name: :SC_ID, datatype: :H, value: scid}, + %{name: :entrypoint, datatype: :S, value: :UpdateCode}, + %{name: :code, datatype: :S, value: code} + ], + signer: address + } + }) + + Logger.info(estimate: estimate) + + request(wallet_client, %{ + jsonrpc: "2.0", + id: "1", + method: "transfer", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :UpdateCode}, + %{name: :code, datatype: :S, value: code} + ], + fees: fees + } + }) + |> handle_txid_response() + end + + def rtbl_boost( + daemon_client: daemon_client = %Tesla.Client{}, + wallet_client: wallet_client = %Tesla.Client{}, + mscid: mscid, + stars_i: stars_i, + scid: scid, + post: post + ) + when is_binary(mscid) and is_integer(stars_i) and is_binary(scid) and + is_binary(post) do + {:ok, %{"address" => address}} = get_address(wallet_client) + + {:ok, estimate = %{"gasstorage" => fees}} = + request(daemon_client, %{ + jsonrpc: "2.0", + id: "1", + method: "DERO.GetGasEstimate", + params: %{ + sc_rpc: [ + %{name: :SC_ACTION, datatype: :U, value: 0}, + %{name: :SC_ID, datatype: :H, value: scid}, + %{name: :entrypoint, datatype: :S, value: :B}, + %{name: :p, datatype: :S, value: post} + ], + transfers: [%{scid: mscid, burn: stars_i}], + signer: address + } + }) + + Logger.info(boost_estimate: estimate) + + request(wallet_client, %{ + jsonrpc: "2.0", + id: "1", + method: "transfer", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :B}, + %{name: :p, datatype: :S, value: post} + ], + transfers: [%{scid: mscid, burn: stars_i}], + fees: fees + } + }) + |> handle_txid_response() + end + + @spec initiate_transfer_ownership(Tesla.Client.t(), [{:newOwner, binary} | {:scid, binary}, ...]) :: + {:error, any} | {:ok, any} + def initiate_transfer_ownership(client = %Tesla.Client{}, scid: scid, newOwner: newOwner) + when is_binary(scid) and is_binary(newOwner) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :InitiateTransferOwnership}, + %{name: :newOwner, datatype: :S, value: newOwner} + ] + } + }) + |> handle_txid_response() + end + + @spec transfer_ownership(Tesla.Client.t(), [{:newOwner, binary} | {:scid, binary}, ...]) :: + {:error, any} | {:ok, any} + def transfer_ownership(client = %Tesla.Client{}, scid: scid, newOwner: newOwner) + when is_binary(scid) and is_binary(newOwner) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :TransferOwnership}, + %{name: :newOwner, datatype: :S, value: newOwner} + ] + } + }) + |> handle_txid_response() + end + + @spec claim_ownership(Tesla.Client.t(), [{:scid, binary}, ...]) :: {:error, any} | {:ok, any} + def claim_ownership(client = %Tesla.Client{}, scid: scid) when is_binary(scid) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [%{name: :entrypoint, datatype: :S, value: :ClaimOwnership}] + } + }) + |> handle_txid_response() + end + + @spec initiate_transfer_delegateship(Tesla.Client.t(), [ + {:newDelegate, binary} | {:scid, binary}, + ... + ]) :: + {:error, any} | {:ok, any} + def initiate_transfer_delegateship(client = %Tesla.Client{}, + scid: scid, + newDelegate: newDelegate + ) + when is_binary(scid) and is_binary(newDelegate) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :InitiateTransferDelegateship}, + %{name: :newDelegate, datatype: :S, value: newDelegate} + ] + } + }) + |> handle_txid_response() + end + + @spec transfer_delegateship(Tesla.Client.t(), [{:newDelegate, binary} | {:scid, binary}, ...]) :: + {:error, any} | {:ok, any} + def transfer_delegateship(client = %Tesla.Client{}, scid: scid, newDelegate: newDelegate) + when is_binary(scid) and is_binary(newDelegate) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [ + %{name: :entrypoint, datatype: :S, value: :TransferDelegateship}, + %{name: :newDelegate, datatype: :S, value: newDelegate} + ] + } + }) + |> handle_txid_response() + end + + @spec claim_delegateship(Tesla.Client.t(), [{:scid, binary}, ...]) :: {:error, any} | {:ok, any} + def claim_delegateship(client = %Tesla.Client{}, scid: scid) when is_binary(scid) do + request(client, %{ + jsonrpc: "2.0", + id: "1", + method: "scinvoke", + params: %{ + scid: scid, + ringsize: 2, + sc_rpc: [%{name: :entrypoint, datatype: :S, value: :ClaimDelegateship}] + } + }) + |> handle_txid_response() + end + + def rtbl_something do + # curl --silent http://127.0.0.1:40403/json_rpc \ + # -d '{"jsonrpc":"2.0","id":"0","method":"transfer","params":{"scid":"'"$appscid"'","ringsize":2, "sc_rpc":[{"name":"entrypoint","datatype":"S","value":"Publish"},{"name":"topic","datatype":"S","value":"mytopic"},{"name":"payload","datatype":"S","value":"{\"name\":\"Stephen Ausman\"}"}], "transfers": [{"scid":"'"$scid"'", "burn":123000}] }}' \ + # -H 'Content-Type: application/json' \ + # -u "8123:metm" + end + + def install_sc(client = %Tesla.Client{}, sc_binary) when is_binary(sc_binary) do + {:ok, %{body: body}} = Tesla.post(client, "/install_sc", sc_binary) + %{"txid" => txid} = Jason.decode!(body) + {:ok, txid} + end + + defp request(client = %Tesla.Client{}, body = %{method: method}) + when is_binary(method) and is_map(body) do + case Tesla.post(client, "/json_rpc", body) do + {:ok, %{body: %{"result" => result}}} -> + {:ok, result} + + {:ok, %{body: %{"error" => %{"code" => -32_098, "message" => message}}}} -> + {:error, message} + + {:error, message} -> + {:error, message} + end + end + + defp handle_txid_response(result) do + case result do + {:ok, %{"txid" => txid}} -> + {:ok, txid} + + e = {:error, _} -> + e + end + end + + def wallet_client(%{wallet_url: wallet_url, wallet_user: wallet_user, wallet_pass: wallet_pass}) + when is_binary(wallet_url) and is_binary(wallet_user) and is_binary(wallet_pass) do + middleware = [ + {Tesla.Middleware.BaseUrl, wallet_url}, + {Tesla.Middleware.BasicAuth, username: wallet_user, password: wallet_pass}, + Tesla.Middleware.JSON, + {Tesla.Middleware.Timeout, timeout: @tesla_timeout} + # Tesla.Middleware.Logger + ] + + Tesla.client(middleware) + end + + def wallet_client_form_urlencoded(%{ + wallet_url: wallet_url, + wallet_user: wallet_user, + wallet_pass: wallet_pass + }) + when is_binary(wallet_url) and is_binary(wallet_user) and is_binary(wallet_pass) do + middleware = [ + {Tesla.Middleware.BaseUrl, wallet_url}, + {Tesla.Middleware.BasicAuth, username: wallet_user, password: wallet_pass}, + Tesla.Middleware.FormUrlencoded, + {Tesla.Middleware.Timeout, timeout: @tesla_timeout} + # Tesla.Middleware.Logger + ] + + Tesla.client(middleware) + end + + def daemon_client(%{daemon_url: daemon_url}) when is_binary(daemon_url) do + middleware = [ + {Tesla.Middleware.BaseUrl, daemon_url}, + Tesla.Middleware.JSON, + {Tesla.Middleware.Timeout, timeout: @tesla_timeout} + # Tesla.Middleware.Logger + ] + + Tesla.client(middleware) + end +end diff --git a/lib/ckd/services/dero/dero_transfer.ex b/lib/ckd/services/dero/dero_transfer.ex new file mode 100644 index 0000000..7adb51c --- /dev/null +++ b/lib/ckd/services/dero/dero_transfer.ex @@ -0,0 +1,49 @@ +defmodule Ckd.DeroTransfer do + @moduledoc false + + use DynamicSupervisor + + alias Ckd.DeroTransferWorker + + @spec start_link(any) :: :ignore | {:error, any} | {:ok, pid} + def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) + + @impl true + @spec init(any) :: + {:ok, + %{ + extra_arguments: list, + intensity: non_neg_integer, + max_children: :infinity | non_neg_integer, + period: pos_integer, + strategy: :one_for_one + }} + def init(_), do: DynamicSupervisor.init(strategy: :one_for_one) + + @spec subscribe_info({:dero_transfer, [{:aid, binary} | {:txid, binary}, ...]}) :: {:ok, map()} + def subscribe_info(info_key = {:dero_transfer, aid: aid, txid: txid}) + when is_binary(aid) and is_binary(txid), + do: GenServer.call(worker_pid({aid, txid}), {:subscribe_info, info_key}) + + @spec unsubscribe_info({:dero_transfer, [{:aid, binary} | {:txid, binary}, ...]}) :: :ok + def unsubscribe_info(info_key = {:dero_transfer, aid: aid, txid: txid}) + when is_binary(aid) and is_binary(txid), + do: GenServer.call(worker_pid({aid, txid}), {:unsubscribe_info, info_key}) + + defp worker_pid({aid, txid}) when is_binary(aid) and is_binary(txid), + do: + (case(maybe_worker({aid, txid})) do + [{pid, _}] -> + pid + + [] -> + {:ok, pid} = start_worker({aid, txid}) + pid + end) + + defp maybe_worker({aid, txid}) when is_binary(aid) and is_binary(txid), + do: Registry.lookup(Ckd.Application.m_worker_registry(), {aid, txid}) + + defp start_worker({aid, txid}) when is_binary(aid) and is_binary(txid), + do: DynamicSupervisor.start_child(__MODULE__, {DeroTransferWorker, {aid, txid}}) +end diff --git a/lib/ckd/services/dero/dero_transfer_worker.ex b/lib/ckd/services/dero/dero_transfer_worker.ex new file mode 100644 index 0000000..12a737b --- /dev/null +++ b/lib/ckd/services/dero/dero_transfer_worker.ex @@ -0,0 +1,119 @@ +defmodule Ckd.DeroTransferWorker do + @moduledoc false + + use GenServer + require Logger + + @registry Ckd.Application.dero_transfer_worker_registry() + + def start_link({aid, txid}) when is_binary(aid) and is_binary(txid), + do: GenServer.start_link(__MODULE__, {aid, txid}, name: via_tuple({aid, txid})) + + def init({aid, txid}) when is_binary(aid) and is_binary(txid) do + _account = Ckd.State.subscribe_info({:account, aid}) + _info = Ckd.DeroWallet.subscribe_info({:dero_wallet, aid}) + + {:ok, %{aid: aid, txid: txid, dero: nil, subs: [], info: nil}} + end + + def handle_info({{:account, _aid}, nil}, _state), + do: Process.exit(self(), :normal) + + def handle_info( + {{:account, aid}, %{dero: same_dero}}, + state = %{aid: aid, dero: same_dero} + ), + do: {:no_reply, state} + + def handle_info({{:account, aid}, %{dero: new_dero}}, state = %{aid: aid}) do + # send(self(), {:fetch_info, dero: new_dero}) + + {:noreply, + state + |> Map.put(:dero, new_dero) + |> Map.put(:wallet_client, Ckd.DeroRPC.wallet_client(new_dero))} + end + + def handle_info( + {{:dero_wallet, _aid}, {:ok, %{height: _}}}, + state = %{ + info: _info_was = {:ok, _} + } + ) do + {:noreply, state} + end + + # don't do anything if we already have info + def handle_info({{:dero_wallet, aid}, _}, state = %{aid: aid, info: {:ok, _}}) + when is_binary(aid) do + {:noreply, state} + end + + def handle_info( + {{:dero_wallet, aid}, {:ok, %{height: _}}}, + state = %{ + aid: aid, + txid: txid, + wallet_client: wallet_client = %Tesla.Client{}, + info: info_was, + subs: subs + } + ) + when is_binary(aid) and is_binary(txid) do + new_info = + case Ckd.DeroRPC.get_transfer(wallet_client, txid: txid) do + success = {:ok, _} -> + success + + {:error, message} -> + error = %{message: message, code: :dero_transfer_worker} + Logger.error(error) + {:error, error} + end + + if new_info != info_was do + Enum.each(subs, fn {pid, info_key = {:dero_transfer, aid: ^aid, txid: ^txid}} -> + send(pid, {info_key, new_info}) + end) + end + + {:noreply, Map.put(state, :info, new_info)} + end + + def handle_info( + {{:dero_wallet, _aid}, new_info = {:error, %{code: _, message: _}}}, + state = %{aid: aid, txid: txid, subs: subs, info: info_was} + ) do + if new_info != info_was do + Enum.each(subs, fn {pid, info_key = {:dero_transfer, aid: ^aid, txid: ^txid}} -> + send(pid, {info_key, new_info}) + end) + end + + {:noreply, Map.put(state, :info, new_info)} + end + + def handle_call( + {:subscribe_info, info_key = {:dero_transfer, aid: aid, txid: txid}}, + {from_pid, _}, + state = %{aid: aid, txid: txid, subs: subs} + ) + when is_binary(aid) and is_binary(txid) and is_list(subs) do + new_state = Map.put(state, :subs, subs ++ [{from_pid, info_key}]) + send(from_pid, {info_key, new_state.info}) + {:reply, {:ok, new_state.info}, new_state} + end + + def handle_call( + {:unsubscribe_info, info_key = {:dero_transfer, aid: aid, txid: txid}}, + {from_pid, _}, + state = %{aid: aid, txid: txid, subs: subs} + ) + when is_binary(aid) and is_binary(txid) and is_list(subs) do + {:reply, :ok, + Map.put(state, :subs, Enum.reject(subs, fn sub -> sub == {from_pid, info_key} end))} + end + + defp via_tuple({aid, txid}) when is_binary(aid) and is_binary(txid), + do: {:via, Registry, {@registry, {aid, txid}}} +end diff --git a/lib/ckd/dero_wallet.ex b/lib/ckd/services/dero/dero_wallet.ex similarity index 92% rename from lib/ckd/dero_wallet.ex rename to lib/ckd/services/dero/dero_wallet.ex index f8df0c3..af411b7 100644 --- a/lib/ckd/dero_wallet.ex +++ b/lib/ckd/services/dero/dero_wallet.ex @@ -26,6 +26,9 @@ defmodule Ckd.DeroWallet do def unsubscribe_info(key = {:dero_wallet, aid}) when is_binary(aid), do: GenServer.call(worker_pid(aid), {:unsubscribe_info, key}) + def read(key = {:dero_wallet, aid}) when is_binary(aid), + do: GenServer.call(worker_pid(aid), {:read, key}) + defp worker_pid(aid) when is_binary(aid), do: (case(maybe_worker(aid)) do diff --git a/lib/ckd/services/dero/dero_wallet_worker.ex b/lib/ckd/services/dero/dero_wallet_worker.ex new file mode 100644 index 0000000..75d1b79 --- /dev/null +++ b/lib/ckd/services/dero/dero_wallet_worker.ex @@ -0,0 +1,159 @@ +defmodule Ckd.DeroWalletWorker do + @moduledoc false + + use GenServer + require Logger + + @registry Ckd.Application.dero_wallet_worker_registry() + @fetch_interval 1_000 + @fetch_interval_slow 8_000 + @dero_divisor 100_000 + @send_delay 1_000 + + def start_link(aid) when is_binary(aid), + do: GenServer.start_link(__MODULE__, aid, name: via_tuple(aid)) + + def init(aid) when is_binary(aid) do + _existing_account = %{} = Ckd.State.subscribe_info({:account, aid}) + {:ok, %{aid: aid, subs: [], dero: nil, info: nil, queue: []}} + end + + def handle_info({{:account, _aid}, nil}, _state), + do: Process.exit(self(), :normal) + + def handle_info( + {{:account, aid}, %{dero: same_dero}}, + state = %{aid: aid, dero: same_dero} + ), + do: {:no_reply, state} + + def handle_info({{:account, aid}, %{dero: new_dero}}, state = %{aid: aid}) do + send(self(), {:fetch_info, dero: new_dero}) + + {:noreply, + state + |> Map.put(:dero, new_dero) + |> Map.put(:wallet_client, Ckd.DeroRPC.wallet_client(new_dero))} + end + + def handle_info( + msg = {:fetch_info, dero: dero}, + state = %{ + aid: aid, + dero: dero, + info: info_was, + subs: subs, + wallet_client: wallet_client + } + ) do + new_info = + with {:ok, height} <- get_height(wallet_client), + {:ok, address} <- get_address(wallet_client, info_was, new_height: height), + {:ok, balance: balance, unlocked_balance: unlocked_balance} <- + get_balance(wallet_client, info_was, new_height: height) do + ok_info = %{ + height: height, + address: address, + balance: balance, + unlocked_balance: unlocked_balance + } + + Process.send_after(self(), msg, @fetch_interval) + + {:ok, ok_info} + else + {:error, message} -> + error = %{message: message, code: :dero_wallet_worker} + Logger.error(error) + Process.send_after(self(), msg, @fetch_interval_slow) + {:error, error} + end + + if new_info != info_was do + Enum.each(subs, fn {pid, info_key = {:dero_wallet, ^aid}} -> + # without delay, m* balance doesn't get updated until next height (sometimes) + Process.send_after(pid, {info_key, new_info}, @send_delay) + end) + end + + {:noreply, Map.put(state, :info, new_info)} + end + + def handle_info({:fetch_info, dero: _stale_dero}, state), do: {:noreply, state} + + def handle_call( + {:subscribe_info, info_key = {:dero_wallet, aid}}, + {from_pid, _}, + state = %{aid: aid, subs: subs} + ) + when is_binary(aid) and is_list(subs) do + new_state = Map.put(state, :subs, subs ++ [{from_pid, info_key}]) + send(from_pid, {info_key, new_state.info}) + {:reply, new_state.info, new_state} + end + + def handle_call( + {:unsubscribe_info, info_key = {:dero_wallet, aid}}, + {from_pid, _}, + state = %{aid: aid, subs: subs} + ) + when is_binary(aid) and is_list(subs) do + {:reply, :ok, + Map.put(state, :subs, Enum.reject(subs, fn sub -> sub == {from_pid, info_key} end))} + end + + def handle_call( + {:read, _info_key = {:dero_wallet, aid}}, + _from, + state = %{aid: aid, info: info} + ) + when is_binary(aid) do + {:reply, info, state} + end + + # handle_call(:queue_action) + + defp get_height(wallet_client) do + case Ckd.DeroRPC.get_height(wallet_client) do + {:ok, %{"height" => height}} -> {:ok, height} + e -> e + end + end + + defp get_address(_wallet_client, {:ok, ok_info_was = %{height: same_height}}, + new_height: same_height + ) + when is_number(same_height), + do: {:ok, ok_info_was.address} + + defp get_address(wallet_client, _info_was, new_height: _) do + case Ckd.DeroRPC.get_address(wallet_client) do + {:ok, %{"address" => address}} -> + {:ok, address} + + e -> + e + end + end + + defp get_balance(_wallet_client, {:ok, ok_info_was = %{height: same_height}}, + new_height: same_height + ) + when is_number(same_height) do + {:ok, balance: ok_info_was.balance, unlocked_balance: ok_info_was.unlocked_balance} + end + + defp get_balance(wallet_client, _info_was, new_height: _) do + case Ckd.DeroRPC.get_balance(wallet_client) do + {:ok, %{"balance" => balance, "unlocked_balance" => unlocked_balance}} -> + {:ok, + balance: balance / @dero_divisor, unlocked_balance: unlocked_balance / @dero_divisor} + + e -> + e + end + end + + defp via_tuple(aid) when is_binary(aid), + do: {:via, Registry, {@registry, aid}} +end diff --git a/lib/ckd/services/m/m.ex b/lib/ckd/services/m/m.ex new file mode 100644 index 0000000..b2649a1 --- /dev/null +++ b/lib/ckd/services/m/m.ex @@ -0,0 +1,85 @@ +defmodule Ckd.M do + @moduledoc false + + use DynamicSupervisor + + alias Ckd.MWorker + + @spec start_link(any) :: :ignore | {:error, any} | {:ok, pid} + def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) + + @impl true + @spec init(any) :: + {:ok, + %{ + extra_arguments: list, + intensity: non_neg_integer, + max_children: :infinity | non_neg_integer, + period: pos_integer, + strategy: :one_for_one + }} + def init(_), do: DynamicSupervisor.init(strategy: :one_for_one) + + @spec subscribe_info({:m, [{:aid, binary} | {:scid, binary}, ...]}) :: + {:ok, map()} | {:error, map()} + def subscribe_info(info_key = {:m, aid: aid, scid: scid}) + when is_binary(aid) and is_binary(scid), + do: GenServer.call(worker_pid({aid, scid}), {:subscribe_info, info_key}) + + @spec unsubscribe_info({:m, [{:aid, binary} | {:scid, binary}, ...]}) :: :ok + def unsubscribe_info(info_key = {:m, aid: aid, scid: scid}) + when is_binary(aid) and is_binary(scid), + do: GenServer.call(worker_pid({aid, scid}), {:unsubscribe_info, info_key}) + + @spec read({:m, [{:aid, binary} | {:scid, binary}, ...]}) :: {:ok, map()} | {:error, map()} + def read(info_key = {:m, aid: aid, scid: scid}) + when is_binary(aid) and is_binary(scid), + do: GenServer.call(worker_pid({aid, scid}), {:read, info_key}) + + @spec queue_give(binary, %{ + :dero => binary, + :scid => binary, + :anon? => boolean, + optional(any) => any + }) :: any + def queue_give(aid, %{scid: scid, dero: dero, anon?: anon?}) + when is_binary(aid) and is_binary(scid) and is_binary(dero) and is_boolean(anon?) do + case to_dero_i(dero) do + {:ok, dero_i} -> + Ckd.DeroQueue.queue_action( + aid, + {:m_give, %{scid: scid, dero_i: dero_i, anon?: anon?}} + ) + + e = {:error, _} -> + e + end + end + + defp worker_pid({aid, scid}) when is_binary(aid) and is_binary(scid), + do: + (case(maybe_worker({aid, scid})) do + [{pid, _}] -> + pid + + [] -> + {:ok, pid} = start_worker({aid, scid}) + pid + end) + + defp maybe_worker({aid, scid}) when is_binary(aid) and is_binary(scid), + do: Registry.lookup(Ckd.Application.m_worker_registry(), {aid, scid}) + + defp start_worker({aid, scid}) when is_binary(aid) and is_binary(scid), + do: DynamicSupervisor.start_child(__MODULE__, {MWorker, {aid, scid}}) + + defp to_dero_i(dero_string) when is_binary(dero_string) do + case Float.parse(dero_string) do + {dero_float, ""} -> + {:ok, round(dero_float * 100_000)} + + _ -> + {:error, "failed to parse dero int"} + end + end +end diff --git a/lib/ckd/services/m/m_worker.ex b/lib/ckd/services/m/m_worker.ex new file mode 100644 index 0000000..0c1d2b9 --- /dev/null +++ b/lib/ckd/services/m/m_worker.ex @@ -0,0 +1,136 @@ +defmodule Ckd.MWorker do + @moduledoc false + + use GenServer + require Logger + + @registry Ckd.Application.m_worker_registry() + # @block_time 18_000 + # @fetch_interval 400 + # @dero_divisor 100_000 + + def start_link({aid, scid}) when is_binary(aid) and is_binary(scid), + do: GenServer.start_link(__MODULE__, {aid, scid}, name: via_tuple({aid, scid})) + + def init({aid, scid}) when is_binary(aid) and is_binary(scid) do + _account = Ckd.State.subscribe_info({:account, aid}) + _info = Ckd.DeroWallet.subscribe_info({:dero_wallet, aid}) + + {:ok, %{aid: aid, scid: scid, dero: nil, subs: [], info: nil}} + end + + def handle_info({{:account, _aid}, nil}, _state), + do: Process.exit(self(), :normal) + + def handle_info( + {{:account, aid}, %{dero: same_dero}}, + state = %{aid: aid, dero: same_dero} + ), + do: {:no_reply, state} + + def handle_info({{:account, aid}, %{dero: new_dero}}, state = %{aid: aid}) do + # send(self(), {:fetch_info, dero: new_dero}) + + {:noreply, + state + |> Map.put(:dero, new_dero) + |> Map.put(:daemon_client, Ckd.DeroRPC.daemon_client(new_dero)) + |> Map.put(:wallet_client, Ckd.DeroRPC.wallet_client(new_dero))} + end + + def handle_info( + {{:dero_wallet, aid}, {:ok, %{height: height}}}, + state = %{ + aid: aid, + scid: scid, + daemon_client: daemon_client = %Tesla.Client{}, + wallet_client: wallet_client = %Tesla.Client{}, + info: info_was, + subs: subs + } + ) do + new_info = + with {:ok, %{balance: %{dero_f: dao_dero_balance_f}, stringkeys: stringkeys, code: code}} + when is_float(dao_dero_balance_f) <- + Ckd.DeroRPC.get_sc(daemon_client, scid: scid), + {:ok, %{balance: %{token_f: wallet_balance_f}}} when is_float(wallet_balance_f) <- + Ckd.DeroRPC.get_sc_balance(wallet_client, scid: scid) do + {:ok, + %{ + height: height, + code: code, + dao: %{ + balance: %{dero_f: dao_dero_balance_f}, + info: %{ + name: Map.get(stringkeys, "info:name", ""), + ticker: Map.get(stringkeys, "info:ticker", ""), + purpose: Map.get(stringkeys, "info:purpose", ""), + website: Map.get(stringkeys, "info:website", "") + } + }, + wallet: %{ + balance: %{stars_f: wallet_balance_f} + } + }} + else + {:error, message} -> + error = %{message: message, code: :m_worker} + Logger.error(error) + {:error, error} + end + + if new_info != info_was do + Enum.each(subs, fn {pid, info_key = {:m, aid: ^aid, scid: ^scid}} -> + send(pid, {info_key, new_info}) + end) + end + + {:noreply, Map.put(state, :info, new_info)} + end + + def handle_info( + {{:dero_wallet, _aid}, new_info = {:error, %{code: _, message: _}}}, + state = %{aid: aid, scid: scid, subs: subs, info: info_was} + ) do + if new_info != info_was do + Enum.each(subs, fn {pid, info_key = {:m, aid: ^aid, scid: ^scid}} -> + send(pid, {info_key, new_info}) + end) + end + + {:noreply, Map.put(state, :info, new_info)} + end + + def handle_call( + {:subscribe_info, info_key = {:m, aid: aid, scid: scid}}, + {from_pid, _}, + state = %{aid: aid, scid: scid, subs: subs} + ) + when is_binary(aid) and is_binary(scid) and is_list(subs) do + new_state = Map.put(state, :subs, subs ++ [{from_pid, info_key}]) + send(from_pid, {info_key, new_state.info}) + {:reply, new_state.info, new_state} + end + + def handle_call( + {:read, _info_key = {:m, aid: aid, scid: scid}}, + _from, + state = %{aid: aid, scid: scid} + ) + when is_binary(aid) and is_binary(scid) do + {:reply, state.info, state} + end + + def handle_call( + {:unsubscribe_info, info_key = {:m, aid: aid, scid: scid}}, + {from_pid, _}, + state = %{aid: aid, scid: scid, subs: subs} + ) + when is_binary(aid) and is_binary(scid) and is_list(subs) do + {:reply, :ok, + Map.put(state, :subs, Enum.reject(subs, fn sub -> sub == {from_pid, info_key} end))} + end + + defp via_tuple({aid, scid}) when is_binary(aid) and is_binary(scid), + do: {:via, Registry, {@registry, {aid, scid}}} +end diff --git a/lib/ckd/services/rtbl/rtbl.ex b/lib/ckd/services/rtbl/rtbl.ex new file mode 100644 index 0000000..263152b --- /dev/null +++ b/lib/ckd/services/rtbl/rtbl.ex @@ -0,0 +1,63 @@ +defmodule Ckd.Rtbl do + @moduledoc false + + use DynamicSupervisor + + alias Ckd.RtblWorker + + @spec start_link(any) :: :ignore | {:error, any} | {:ok, pid} + def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__) + + @impl true + @spec init(any) :: + {:ok, + %{ + extra_arguments: list, + intensity: non_neg_integer, + max_children: :infinity | non_neg_integer, + period: pos_integer, + strategy: :one_for_one + }} + def init(_), do: DynamicSupervisor.init(strategy: :one_for_one) + + def subscribe_info(info_key = {:rtbl, aid: aid, scid: scid}) + when is_binary(aid) and is_binary(scid), + do: GenServer.call(worker_pid({aid, scid}), {:subscribe_info, info_key}) + + def unsubscribe_info(info_key = {:rtbl, aid: aid, scid: scid}) + when is_binary(aid) and is_binary(scid), + do: GenServer.call(worker_pid({aid, scid}), {:unsubscribe_info, info_key}) + + def read(info_key = {:rtbl, aid: aid, scid: scid}) + when is_binary(aid) and is_binary(scid), + do: GenServer.call(worker_pid({aid, scid}), {:read, info_key}) + + # @spec queue_give(binary, %{:dero => binary, :scid => binary, optional(any) => any}) :: any + # def queue_give(aid, %{scid: scid, dero: dero}) + # when is_binary(aid) and is_binary(scid) and is_binary(dero) do + # case to_dero_int(dero) do + # {:ok, dero_int} -> + # Ckd.DeroQueue.queue_action(aid, {:rtbl_give_dero, %{scid: scid, dero: dero_int}}) + + # e = {:error, _} -> + # e + # end + # end + + defp worker_pid({aid, scid}) when is_binary(aid) and is_binary(scid), + do: + (case(maybe_worker({aid, scid})) do + [{pid, _}] -> + pid + + [] -> + {:ok, pid} = start_worker({aid, scid}) + pid + end) + + defp maybe_worker({aid, scid}) when is_binary(aid) and is_binary(scid), + do: Registry.lookup(Ckd.Application.rtbl_worker_registry(), {aid, scid}) + + defp start_worker({aid, scid}) when is_binary(aid) and is_binary(scid), + do: DynamicSupervisor.start_child(__MODULE__, {RtblWorker, {aid, scid}}) +end diff --git a/lib/ckd/services/rtbl/rtbl_worker.ex b/lib/ckd/services/rtbl/rtbl_worker.ex new file mode 100644 index 0000000..60e120c --- /dev/null +++ b/lib/ckd/services/rtbl/rtbl_worker.ex @@ -0,0 +1,215 @@ +defmodule Ckd.RtblWorker do + @moduledoc false + + use GenServer + require Logger + + @registry Ckd.Application.rtbl_worker_registry() + # @block_time 18_000 + # @fetch_interval 400 + @dero_divisor 100_000 + + def start_link({aid, scid}) when is_binary(aid) and is_binary(scid), + do: GenServer.start_link(__MODULE__, {aid, scid}, name: via_tuple({aid, scid})) + + def init({aid, scid}) when is_binary(aid) and is_binary(scid) do + _account = Ckd.State.subscribe_info({:account, aid}) + _info = Ckd.DeroWallet.subscribe_info({:dero_wallet, aid}) + + {:ok, %{aid: aid, scid: scid, dero: nil, subs: [], info: nil}} + end + + def handle_info({{:account, _aid}, nil}, _state), + do: Process.exit(self(), :normal) + + def handle_info( + {{:account, aid}, %{dero: same_dero}}, + state = %{aid: aid, dero: same_dero} + ), + do: {:no_reply, state} + + def handle_info({{:account, aid}, %{dero: new_dero}}, state = %{aid: aid}) do + # send(self(), {:fetch_info, dero: new_dero}) + + {:noreply, + state + |> Map.put(:dero, new_dero) + |> Map.put(:daemon_client, Ckd.DeroRPC.daemon_client(new_dero))} + + # |> Map.put(:wallet_client, Ckd.DeroRPC.wallet_client(new_dero))} + end + + def handle_info( + {{:dero_wallet, aid}, {:ok, %{height: height}}}, + state = %{ + aid: aid, + scid: scid, + daemon_client: daemon_client = %Tesla.Client{}, + # wallet_client: wallet_client = %Tesla.Client{}, + info: info_was, + subs: subs + } + ) do + new_info = + case Ckd.DeroRPC.get_sc(daemon_client, scid: scid) do + {:ok, %{balance: %{dero_f: dao_dero_balance_f}, stringkeys: stringkeys, code: code}} + when is_float(dao_dero_balance_f) -> + {:ok, + %{ + height: height, + code: code, + balance: %{dero_f: dao_dero_balance_f}, + mscid: Map.get(stringkeys, "mscid", ""), + info: %{ + name: Map.get(stringkeys, "info:name", ""), + purpose: Map.get(stringkeys, "info:purpose", ""), + website: Map.get(stringkeys, "info:website", "") + }, + posts: + Map.to_list(stringkeys) + |> Enum.filter(fn {k, _v} -> String.starts_with?(k, "b:") end) + |> Enum.group_by(fn {k, _v} -> + String.split(k, ":") + |> Enum.reverse() + |> tl() + |> Enum.reverse() + end) + |> Map.to_list() + |> Enum.map(fn {_key_parts = ["b", timestamp, random], kvs} -> + {timestamp, ""} = Integer.parse(timestamp) + {_, post} = Enum.find(kvs, fn {sk, _sv} -> String.ends_with?(sk, ":p") end) + {_, stars} = Enum.find(kvs, fn {sk, _sv} -> String.ends_with?(sk, ":s") end) + + %{ + bid: "#{timestamp}:#{random}", + post: post, + stars: stars, + timestamp: timestamp + } + end) + |> Enum.group_by(fn %{post: post} -> post end) + |> Map.to_list() + |> Enum.sort_by(fn {_, boosts} -> + Enum.reduce(boosts, 0, fn %{stars: stars}, acc -> acc + stars end) + end) + |> Enum.reverse() + |> Enum.map(fn {post, boosts} -> + %{ + rawText: post, + totalStarsFormatted: + Ckd.Util.to_balance_formatted( + Enum.reduce(boosts, 0, fn %{stars: stars}, acc -> acc + stars end) / + @dero_divisor + ), + boosts: + Enum.map(boosts, fn b -> + %{ + # bid: b.bid, + starsFormatted: Ckd.Util.to_balance_formatted(b.stars / @dero_divisor), + timestamp: + b.timestamp |> DateTime.from_unix!(:second) |> DateTime.to_iso8601() + } + end) + # Enum.group_by(boosts, fn %{reply: reply} -> reply end) + # |> Map.to_list() + # |> Enum.sort_by(fn {_, boosts} -> + # Enum.reduce(boosts, 0, fn %{stars: stars}, acc -> acc + stars end) + # end) + # |> Enum.reverse() + # |> Enum.map(fn {reply, boosts} -> + # %{ + # rawText: reply, + # totalStarsFormatted: + # Ckd.Util.to_balance_formatted( + # Enum.reduce(boosts, 0, fn %{stars: stars}, acc -> acc + stars end) / + # @dero_divisor + # ), + # boosts: + # boosts + # |> Enum.sort_by(fn %{stars: stars} -> stars end) + # |> Enum.reverse() + # |> Enum.map(fn %{ + # addr: addr, + # stars: stars, + # timestamp: timestamp + # } -> + # %{ + # addr: addr, + # starsFormatted: + # Ckd.Util.to_balance_formatted(stars / @dero_divisor), + # timestamp: + # timestamp + # |> DateTime.from_unix!(:second) + # |> DateTime.to_iso8601() + # } + # end) + # } + # end) + # Enum.map(boosts, fn %{signer: signer, stars: stars, timestamp: timestamp} -> + # %{signer: signer, stars: stars, timestamp: timestamp} + # end) + } + end) + }} + + {:error, message} -> + error = %{message: message, code: :m_worker} + Logger.error(error) + {:error, error} + end + + if new_info != info_was do + Enum.each(subs, fn {pid, info_key = {:rtbl, aid: ^aid, scid: ^scid}} -> + send(pid, {info_key, new_info}) + end) + end + + {:noreply, Map.put(state, :info, new_info)} + end + + def handle_info( + {{:dero_wallet, _aid}, new_info = {:error, %{code: _, message: _}}}, + state = %{aid: aid, scid: scid, subs: subs, info: info_was} + ) do + if new_info != info_was do + Enum.each(subs, fn {pid, info_key = {:rtbl, aid: ^aid, scid: ^scid}} -> + send(pid, {info_key, new_info}) + end) + end + + {:noreply, Map.put(state, :info, new_info)} + end + + def handle_call( + {:subscribe_info, info_key = {:rtbl, aid: aid, scid: scid}}, + {from_pid, _}, + state = %{aid: aid, scid: scid, subs: subs} + ) + when is_binary(aid) and is_binary(scid) and is_list(subs) do + new_state = Map.put(state, :subs, subs ++ [{from_pid, info_key}]) + send(from_pid, {info_key, new_state.info}) + {:reply, new_state.info, new_state} + end + + def handle_call( + {:read, _info_key = {:rtbl, aid: aid, scid: scid}}, + _from, + state = %{aid: aid, scid: scid} + ) + when is_binary(aid) and is_binary(scid) do + {:reply, state.info, state} + end + + def handle_call( + {:unsubscribe_info, info_key = {:rtbl, aid: aid, scid: scid}}, + {from_pid, _}, + state = %{aid: aid, scid: scid, subs: subs} + ) + when is_binary(aid) and is_binary(scid) and is_list(subs) do + {:reply, :ok, + Map.put(state, :subs, Enum.reject(subs, fn sub -> sub == {from_pid, info_key} end))} + end + + defp via_tuple({aid, scid}) when is_binary(aid) and is_binary(scid), + do: {:via, Registry, {@registry, {aid, scid}}} +end diff --git a/lib/ckd/state.ex b/lib/ckd/state.ex index 8151112..56f74fe 100644 --- a/lib/ckd/state.ex +++ b/lib/ckd/state.ex @@ -36,7 +36,7 @@ defmodule Ckd.State do GenServer.call(__MODULE__, {:read, info_key}) end - @spec subscribe_info(:ckd_state | {:account, binary()}) :: {:ok, any()} + @spec subscribe_info(:ckd_state | {:account, binary()}) :: any() def subscribe_info(info_key) do GenServer.call(__MODULE__, {:subscribe_info, info_key}) end @@ -153,7 +153,7 @@ defmodule Ckd.State do new_state = Map.put(old_state, :subs, subs ++ [{from_pid, info_key}]) info = to_info(info_key, new_state) send(from_pid, {info_key, info}) - {:reply, {:ok, info}, new_state} + {:reply, info, new_state} end @impl true diff --git a/lib/ckd/util.ex b/lib/ckd/util.ex new file mode 100644 index 0000000..4f1f89e --- /dev/null +++ b/lib/ckd/util.ex @@ -0,0 +1,27 @@ +defmodule Ckd.Util do + @trailing_zero "0" + @trailing_decimal "." + @slug_regex ~r/[^a-zA-Z0-9 ]/ + + @spec to_balance(number) :: binary + def to_balance(n) when is_number(n) do + Number.Delimit.number_to_delimited(n, precision: 5, delimiter: "") + |> String.trim_trailing(@trailing_zero) + |> String.trim_trailing(@trailing_decimal) + end + + def to_balance_formatted(n) when is_number(n) do + Number.Delimit.number_to_delimited(n, precision: 5) + |> String.trim_trailing(@trailing_zero) + |> String.trim_trailing(@trailing_decimal) + end + + @spec to_slug(binary) :: binary + def to_slug(s) when is_binary(s) do + s + |> String.downcase() + |> String.replace(@slug_regex, "") + |> String.split() + |> Enum.join("-") + end +end diff --git a/lib/ckd_web/channels/socket.ex b/lib/ckd_web/channels/socket.ex index 2aef29b..7fbff7e 100644 --- a/lib/ckd_web/channels/socket.ex +++ b/lib/ckd_web/channels/socket.ex @@ -8,7 +8,8 @@ defmodule CkdWeb.Socket do channel "v0:signIn", V0.SignInChannel channel "v0:account", V0.AccountChannel channel "v0:dero", V0.DeroChannel - # channel "v0:m:*", V0.MChannel + channel "v0:m:*", V0.MChannel + channel "v0:rtbl:*", V0.RtblChannel # Socket params are passed from the client and can # be used to verify and authenticate a user. After diff --git a/lib/ckd_web/channels/v0/dero_channel.ex b/lib/ckd_web/channels/v0/dero_channel.ex index 1be663c..281d9ea 100644 --- a/lib/ckd_web/channels/v0/dero_channel.ex +++ b/lib/ckd_web/channels/v0/dero_channel.ex @@ -2,36 +2,108 @@ defmodule CkdWeb.Channels.V0.DeroChannel do @moduledoc false use Phoenix.Channel + alias CkdWeb.Router.Helpers, as: RouterHelpers @api "v0:dero" def join(@api, _message, socket = %{assigns: %{aid: aid}}) when is_binary(aid) do - Ckd.DeroWallet.subscribe_info({:dero_wallet, aid}) - {:ok, assign(socket, sent_data: nil)} + dero_wallet = Ckd.DeroWallet.subscribe_info({:dero_wallet, aid}) + dero_queue_pending_item = Ckd.DeroQueue.subscribe_info({:dero_queue_pending_item, aid}) + + IO.inspect(dero_queue_pending_item) + + {:ok, + assign(socket, + info: %{dero_wallet: dero_wallet, dero_queue_pending_item: dero_queue_pending_item}, + sent_data: nil + )} end def terminate(_reason, _socket = %{assigns: %{aid: aid}}) when is_binary(aid) do Ckd.DeroWallet.unsubscribe_info({:dero_wallet, aid}) + Ckd.DeroQueue.unsubscribe_info({:dero_queue_pending_item, aid}) end def handle_info( - {{:dero_wallet, aid}, - %{address: address, height: height, balance: balance, unlocked_balance: unlocked_balance}}, - socket = %{assigns: %{aid: aid, sent_data: sent_data}} + {{:dero_wallet, aid}, dero_wallet}, + socket = %{assigns: %{aid: aid, info: info_was, sent_data: sent_data}} ) - when is_binary(aid) and is_binary(address) and is_number(height) and is_number(balance) and - is_number(unlocked_balance) do - new_data = %{ - address: address, - height: height, - balance: "#{balance}", - unlockedBalance: "#{unlocked_balance}" - } + when is_binary(aid) do + new_info = info_was |> Map.put(:dero_wallet, dero_wallet) + new_data = {data_topic, data_payload} = data_from(new_info, aid: aid) if new_data != sent_data do - push(socket, "DATA", new_data) + push(socket, data_topic, data_payload) end - {:noreply, assign(socket, sent_data: new_data)} + {:noreply, assign(socket, info: new_info, sent_data: new_data)} + end + + def handle_info( + {{:dero_queue_pending_item, aid}, dero_queue_pending_item}, + socket = %{assigns: %{aid: aid, info: info_was, sent_data: sent_data}} + ) + when is_binary(aid) do + new_info = info_was |> Map.put(:dero_queue_pending_item, dero_queue_pending_item) + new_data = {data_topic, data_payload} = data_from(new_info, aid: aid) + + if new_data != sent_data do + push(socket, data_topic, data_payload) + end + + {:noreply, assign(socket, info: new_info, sent_data: new_data)} + end + + defp data_from( + _info = %{ + dero_wallet: + {:ok, + %{ + address: address, + height: height, + balance: balance + }}, + dero_queue_pending_item: dero_queue_pending_item + }, + aid: aid + ) + when is_binary(aid) do + {"DATA", + %{ + address: address, + height: height, + balance: %{ + dero: Ckd.Util.to_balance(balance), + deroFormatted: Ckd.Util.to_balance_formatted(balance) + }, + pendingAction: + case dero_queue_pending_item do + nil -> + nil + + %{status: status, qid: qid} when is_atom(status) and is_binary(qid) -> + %{ + status: status, + authorizeUrl: + RouterHelpers.authorize_m_url(CkdWeb.Endpoint, :authorize_m, aid, qid) + } + end + }} + end + + # TODO: send error as part of "DATA"? to preserve pendingAction + defp data_from( + _info = %{ + dero_wallet: + {:error, + %{ + code: error_code, + message: error_message + }}, + dero_queue_pending_item: _ + }, + _ + ) do + {"DATA__ERROR", %{code: error_code, message: error_message}} end end diff --git a/lib/ckd_web/channels/v0/m_channel.ex b/lib/ckd_web/channels/v0/m_channel.ex new file mode 100644 index 0000000..63e7f95 --- /dev/null +++ b/lib/ckd_web/channels/v0/m_channel.ex @@ -0,0 +1,113 @@ +defmodule CkdWeb.Channels.V0.MChannel do + @moduledoc false + + use Phoenix.Channel + require Logger + + @api "v0:m:" + + def join(@api <> scid, _message, socket = %{assigns: %{aid: aid}}) + when is_binary(scid) and is_binary(aid) do + m = Ckd.M.subscribe_info({:m, aid: aid, scid: scid}) + {:ok, assign(socket, scid: scid, qid: nil, info: %{m: m}, sent_data: nil)} + end + + def terminate(_reason, _socket = %{assigns: %{aid: aid, scid: scid, qid: qid}}) + when is_binary(aid) and is_binary(scid) do + Ckd.M.unsubscribe_info({:m, aid: aid, scid: scid}) + + if qid != nil do + Ckd.DeroQueue.unsubscribe_info({:dero_queue_item, aid: aid, qid: qid}) + end + end + + def handle_info( + {{:m, aid: aid, scid: scid}, new_m_info}, + socket = %{assigns: %{aid: aid, scid: scid, info: info_was, sent_data: sent_data}} + ) + when is_binary(aid) and is_binary(scid) do + new_info = Map.put(info_was, :m, new_m_info) + + new_data = {data_topic, data_payload} = data_from(new_info) + + if new_data != sent_data do + push(socket, data_topic, data_payload) + end + + {:noreply, assign(socket, info: new_info, sent_data: new_data)} + end + + def handle_info( + {{:dero_queue_item, aid: aid, qid: qid}, %{qid: qid, status: :completed}}, + socket = %{assigns: %{aid: aid, qid: qid}} + ) + when is_binary(aid) and is_binary(qid) do + Ckd.DeroQueue.unsubscribe_info({:dero_queue_item, aid: aid, qid: qid}) + push(socket, "GIVE__RESET_FORM", %{}) + {:noreply, assign(socket, qid: nil)} + end + + def handle_info( + {{:dero_queue_item, aid: aid, qid: qid}, nil}, + socket = %{assigns: %{aid: aid, qid: qid}} + ) + when is_binary(aid) and is_binary(qid) do + Ckd.DeroQueue.unsubscribe_info({:dero_queue_item, aid: aid, qid: qid}) + push(socket, "GIVE__CANCEL_FORM", %{}) + {:noreply, assign(socket, qid: nil)} + end + + def handle_info({{:dero_queue_item, _}, _}, socket) do + {:noreply, socket} + end + + def handle_in( + "GIVE", + %{"amount" => %{"dero" => dero}, "anon" => anon?}, + socket = %{assigns: %{aid: aid, scid: scid}} + ) + when is_binary(dero) and is_boolean(anon?) and is_binary(aid) and is_binary(scid) do + case Ckd.M.queue_give(aid, %{scid: scid, dero: dero, anon?: anon?}) do + {:ok, _action = %{qid: qid}} when is_binary(qid) -> + Ckd.DeroQueue.subscribe_info({:dero_queue_item, aid: aid, qid: qid}) + push(socket, "GIVE__OK", %{}) + {:noreply, assign(socket, qid: qid)} + + {:error, message} -> + push(socket, "GIVE__ERROR", %{code: :give_error, message: message}) + {:noreply, socket} + end + end + + defp data_from(%{ + m: + {:ok, + %{ + dao: %{balance: %{dero_f: dao_balance_dero_f}, info: dao_info}, + wallet: %{balance: %{stars_f: wallet_balance_stars_f}} + }} + }) + when is_map(dao_info) and is_number(wallet_balance_stars_f) and + is_number(dao_balance_dero_f) do + {"DATA", + %{ + dao: %{ + balance: %{ + dero: Ckd.Util.to_balance(dao_balance_dero_f), + deroFormatted: Ckd.Util.to_balance_formatted(dao_balance_dero_f) + }, + info: dao_info + }, + wallet: %{ + balance: %{ + stars: Ckd.Util.to_balance(wallet_balance_stars_f), + starsFormatted: Ckd.Util.to_balance_formatted(wallet_balance_stars_f) + } + } + }} + end + + defp data_from(%{m: {:error, error = %{code: _code, message: _message}}}) do + {"DATA__ERROR", error} + end +end diff --git a/lib/ckd_web/channels/v0/rtbl_channel.ex b/lib/ckd_web/channels/v0/rtbl_channel.ex new file mode 100644 index 0000000..9ebdf06 --- /dev/null +++ b/lib/ckd_web/channels/v0/rtbl_channel.ex @@ -0,0 +1,96 @@ +defmodule CkdWeb.Channels.V0.RtblChannel do + @moduledoc false + + use Phoenix.Channel + require Logger + + @api "v0:rtbl:" + + def join(@api <> scid, _message, socket = %{assigns: %{aid: aid}}) + when is_binary(scid) and is_binary(aid) do + # aid = "26Qjb3ioHipYYTFRie1QMVTt5u4" + # _mscid = "a2978f328fde829b8ba27b9c7813b683057300deca6f3839ab13e82d927968a9" + # scid = "95d3577cb9b44f28fe74b13dc59f068a41092227175ed54be8513ae0f94f58f6" + rtbl = Ckd.Rtbl.subscribe_info({:rtbl, aid: aid, scid: scid}) + {:ok, assign(socket, scid: scid, qid: nil, info: %{rtbl: rtbl}, sent_data: nil)} + end + + def terminate(_reason, _socket = %{assigns: %{aid: aid, scid: scid, qid: qid}}) + when is_binary(aid) and is_binary(scid) do + Ckd.Rtbl.unsubscribe_info({:rtbl, aid: aid, scid: scid}) + + if qid != nil do + Ckd.DeroQueue.unsubscribe_info({:dero_queue_item, aid: aid, qid: qid}) + end + end + + def handle_info( + {{:rtbl, aid: aid, scid: scid}, new_rtbl_info}, + socket = %{assigns: %{aid: aid, scid: scid, info: info_was, sent_data: sent_data}} + ) + when is_binary(aid) and is_binary(scid) do + new_info = Map.put(info_was, :rtbl, new_rtbl_info) + + new_data = {data_topic, data_payload} = data_from(new_info) + + if new_data != sent_data do + push(socket, data_topic, data_payload) + end + + {:noreply, assign(socket, info: new_info, sent_data: new_data)} + end + + def handle_info( + {{:dero_queue_item, aid: aid, qid: qid}, %{qid: qid, status: :completed}}, + socket = %{assigns: %{aid: aid, qid: qid}} + ) + when is_binary(aid) and is_binary(qid) do + Ckd.DeroQueue.unsubscribe_info({:dero_queue_item, aid: aid, qid: qid}) + # push(socket, "RAISE__RESET_FORM", %{}) + {:noreply, assign(socket, qid: nil)} + end + + def handle_info( + {{:dero_queue_item, aid: aid, qid: qid}, nil}, + socket = %{assigns: %{aid: aid, qid: qid}} + ) + when is_binary(aid) and is_binary(qid) do + Ckd.DeroQueue.unsubscribe_info({:dero_queue_item, aid: aid, qid: qid}) + # push(socket, "RAISE__CANCEL_FORM", %{}) + {:noreply, assign(socket, qid: nil)} + end + + def handle_info({{:dero_queue_item, _}, _}, socket) do + {:noreply, socket} + end + + def handle_in( + "BOOST", + %{"amount" => %{"stars" => stars}, "post" => %{"rawText" => raw_text}}, + socket = %{assigns: %{aid: aid, scid: scid}} + ) + when is_binary(stars) and is_binary(raw_text) and is_binary(aid) and is_binary(scid) do + push(socket, "BOOST__OK", %{}) + Logger.info(call: {aid, %{scid: scid, stars: stars, post: raw_text}}) + {:noreply, socket} + # case Ckd.Rtbl.queue_boost(aid, %{scid: scid, stars: stars, post: raw_text}) do + # {:ok, _action = %{qid: qid}} when is_binary(qid) -> + # Ckd.DeroQueue.subscribe_info({:dero_queue_item, aid: aid, qid: qid}) + # push(socket, "BOOST__OK", %{}) + # {:noreply, assign(socket, qid: qid)} + + # {:error, message} -> + # push(socket, "BOOST__ERROR", %{code: :give_dero_error, message: message}) + # {:noreply, socket} + # end + end + + defp data_from(%{rtbl: {:ok, %{posts: posts}}}) do + {"DATA", %{posts: posts}} + end + + # TODO + # defp data_from(%{: {:error, error = %{code: _code, message: _message}}}) do + # {"DATA__ERROR", error} + # end +end diff --git a/lib/ckd_web/channels/v0/sign_in_channel.ex b/lib/ckd_web/channels/v0/sign_in_channel.ex index 1b2e55b..5601ee8 100644 --- a/lib/ckd_web/channels/v0/sign_in_channel.ex +++ b/lib/ckd_web/channels/v0/sign_in_channel.ex @@ -222,8 +222,8 @@ defmodule CkdWeb.Channels.V0.SignInChannel do {:reply, :ok, assign(socket, :awaiting_token_lid, lid)} - {:error, e} -> - Logger.error(e) + {:error, message} -> + Logger.error(message: message, source: :sign_in_channel) {:reply, :ok, socket} end end diff --git a/lib/ckd_web/live/authorize_login_live.ex b/lib/ckd_web/live/authorize_login_live.ex index 1103916..f396d01 100644 --- a/lib/ckd_web/live/authorize_login_live.ex +++ b/lib/ckd_web/live/authorize_login_live.ex @@ -4,7 +4,8 @@ defmodule CkdWeb.AuthorizeLoginLive do use CkdWeb, :live_view @impl true - @spec mount(any, any, any) :: {:ok, Phoenix.LiveView.Socket.t()} + @spec mount(map, any, %{:root_pid => any, optional(any) => any}) :: + {:ok, Phoenix.LiveView.Socket.t()} def mount(%{"lid" => lid}, _session, socket = %{root_pid: root_pid}) when is_binary(lid) do %{logins: logins, accounts: accounts} = case root_pid do @@ -12,8 +13,7 @@ defmodule CkdWeb.AuthorizeLoginLive do Ckd.State.read(:ckd_state) _root_pid -> - {:ok, ckd_state} = Ckd.State.subscribe_info(:ckd_state) - ckd_state + Ckd.State.subscribe_info(:ckd_state) end login = Enum.find(logins, fn l -> l.lid == lid end) diff --git a/lib/ckd_web/live/authorize_m_live.ex b/lib/ckd_web/live/authorize_m_live.ex new file mode 100644 index 0000000..cb09470 --- /dev/null +++ b/lib/ckd_web/live/authorize_m_live.ex @@ -0,0 +1,130 @@ +defmodule CkdWeb.AuthorizeMLive do + use CkdWeb, :live_view + + @impl true + @spec mount(any, any, any) :: {:ok, Phoenix.LiveView.Socket.t()} + def mount(%{"aid" => aid, "qid" => qid}, _session, socket = %{root_pid: root_pid}) + when is_binary(aid) and is_binary(qid) do + account = mount_account(aid: aid, root_pid: root_pid) + dero_wallet = mount_dero_wallet(aid: aid, root_pid: root_pid) + + dero_queue_item = + %{action: {:m_give, %{scid: m_scid}}} = + mount_dero_queue_item(aid: aid, qid: qid, root_pid: root_pid) + + m = mount_m(aid: aid, scid: m_scid, root_pid: root_pid) + + {:ok, + assign(socket, + aid: aid, + qid: qid, + m_scid: m_scid, + info: %{account: account, dero_wallet: dero_wallet, dero_queue_item: dero_queue_item, m: m} + )} + end + + @impl true + @spec terminate(any, %{ + :assigns => %{:aid => binary, :m_scid => binary, :qid => binary, optional(any) => any}, + optional(any) => any + }) :: :ok + def terminate(_reason, _socket = %{assigns: %{aid: aid, qid: qid, m_scid: m_scid}}) + when is_binary(aid) and is_binary(qid) and is_binary(m_scid) do + Ckd.State.unsubscribe_info({:account, aid}) + Ckd.DeroWallet.unsubscribe_info({:dero_wallet, aid}) + Ckd.DeroQueue.unsubscribe_info({:dero_queue_item, aid: aid, qid: qid}) + Ckd.M.unsubscribe_info({:m, aid: aid, scid: m_scid}) + end + + @impl true + def handle_event("authorize_action", _params, socket = %{assigns: %{aid: aid, qid: qid}}) + when is_binary(aid) and is_binary(qid) do + Ckd.DeroQueue.authorize_action(aid: aid, qid: qid) + {:noreply, socket} + end + + @impl true + def handle_event("cancel_action", _params, socket = %{assigns: %{aid: aid, qid: qid}}) + when is_binary(aid) and is_binary(qid) do + Ckd.DeroQueue.cancel_action(aid: aid, qid: qid) + {:noreply, socket} + end + + @impl true + def handle_info( + {{:account, aid}, account}, + socket = %{assigns: %{aid: aid, info: info_was}} + ) + when is_binary(aid) do + {:noreply, assign(socket, info: Map.put(info_was, :account, account))} + end + + @impl true + def handle_info( + {{:dero_wallet, aid}, dero_wallet}, + socket = %{assigns: %{aid: aid, info: info_was}} + ) + when is_binary(aid) do + {:noreply, assign(socket, info: Map.put(info_was, :dero_wallet, dero_wallet))} + end + + @impl true + def handle_info( + {{:dero_queue_item, aid: aid, qid: qid}, dero_queue_item}, + socket = %{assigns: %{aid: aid, qid: qid, info: info}} + ) + when is_binary(aid) and is_binary(qid) do + {:noreply, assign(socket, info: Map.put(info, :dero_queue_item, dero_queue_item))} + end + + @impl true + def handle_info( + {{:m, aid: aid, scid: m_scid}, m}, + socket = %{assigns: %{aid: aid, m_scid: m_scid, info: info}} + ) + when is_binary(aid) and is_binary(m_scid) do + {:noreply, assign(socket, info: Map.put(info, :m, m))} + end + + defp mount_account(aid: aid, root_pid: root_pid) when is_binary(aid) do + case root_pid do + nil -> + Ckd.State.read({:account, aid}) + + _root_pid -> + Ckd.State.subscribe_info({:account, aid}) + end + end + + defp mount_dero_wallet(aid: aid, root_pid: root_pid) when is_binary(aid) do + case root_pid do + nil -> + Ckd.DeroWallet.read({:dero_wallet, aid}) + + _root_pid -> + Ckd.DeroWallet.subscribe_info({:dero_wallet, aid}) + end + end + + defp mount_dero_queue_item(aid: aid, qid: qid, root_pid: root_pid) + when is_binary(aid) and is_binary(qid) do + case root_pid do + nil -> + Ckd.DeroQueue.read({:dero_queue_item, aid: aid, qid: qid}) + + _root_pid -> + Ckd.DeroQueue.subscribe_info({:dero_queue_item, aid: aid, qid: qid}) + end + end + + defp mount_m(aid: aid, scid: scid, root_pid: root_pid) + when is_binary(aid) and is_binary(scid) do + case root_pid do + nil -> + Ckd.M.read({:m, aid: aid, scid: scid}) + + _root_pid -> + Ckd.M.subscribe_info({:m, aid: aid, scid: scid}) + end + end +end diff --git a/lib/ckd_web/live/authorize_m_live.html.leex b/lib/ckd_web/live/authorize_m_live.html.leex new file mode 100644 index 0000000..c73d20b --- /dev/null +++ b/lib/ckd_web/live/authorize_m_live.html.leex @@ -0,0 +1,137 @@ +

<%= @info.account.display_name %>

+ +

+ DERO balance: + <%= case @info.dero_wallet do %> + <% {:ok, wallet} -> %> + <%= Ckd.Util.to_balance_formatted(wallet.balance) %> + <% {:error, error} -> %> + <%= Jason.encode!(error) %> + <% end %> +

+ +

+ <%= case @info.m do %> + <% {:ok, %{dao: %{info: %{ticker: ticker}}, wallet: %{balance: %{stars_f: wallet_balance_stars_f}}}} -> %> + m*<%= ticker %> balance: + <%= Ckd.Util.to_balance_formatted(wallet_balance_stars_f) %> + <% {:error, error} -> %> + <%= Jason.encode!(error) %> + <% nil -> %> + - + <% end %> +

+ +
+ +
+ <%= case @info do %> + <% %{dero_queue_item: %{status: status, action: {:m_give, %{dero_i: dero_i, scid: _scid, anon?: anon?}}, txid: txid}, + m: {:ok, %{dao: %{info: %{website: dao_website, ticker: dao_ticker}}}}} -> %> +

+ <%= if status == :completed do %> + <%= URI.parse(dao_website).host %> thanks you… + <% else %> + <%= URI.parse(dao_website).host %> wants you to… + <% end %> +

+ +
+
+ <%= if status == :completed do %> +

Gave DERO

+ <% else %> +

Give DERO

+ <% end %> +

DERO: <%= Ckd.Util.to_balance_formatted(dero_i / 100_000) %>

+
+
+ <%= if status == :awaiting_authorization do %> + + <% end %> + + <%= if status == :processing do %> + + <% end %> +
+
+
+ +
+ <%= case {status, anon?} do %> + <% {_, true} -> %> +

Anonymously

+ <% {:completed, false} -> %> +

Got Stars*

+ <% _ -> %> +

Get Stars*

+ <% end %> + + <%= if anon? do %> +

No Stars

+ <% else %> +

m*<%= dao_ticker %>: <%= Ckd.Util.to_balance_formatted(dero_i * 1_000_000 / 100_000) %>

+ <% end %> + +
+
+ <%= if status == :awaiting_authorization do %> + + <% end %> + + <%= if status == :processing do %> + + <% end %> +
+
+
+
+ + <%= if txid != nil do %> +

+ <%= if (status == :completed) do %> + ✅ + <% else %> + ⏰ + <% end %> +

+

+ txid: + <%= txid %> +

+ <% end %> + + <% %{dero_queue_item: nil} -> %> + nothing queued + + <% %{m: {:error, error}} -> %> + error: <%= Jason.encode!(error) %> + <% end %> +
+ +
+ +
+ <%= case @info.m do %> + <% {:ok, %{code: sc_code, dao: %{info: %{name: dao_name, purpose: dao_purpose}, balance: %{dero_f: dao_balance_dero_f}}}} -> %> +

<%= dao_name %>

+

<%= dao_purpose %>

+

scid: <%= @m_scid %>

+

sc balance: <%= Ckd.Util.to_balance_formatted(dao_balance_dero_f) %>

+
<%= sc_code %>
+ <% {:error, %{code: error_code, message: error_message}} -> %> +
<%= error_code %>: <%= error_message %>
+ <% nil -> %> + <% end %> +
+ + diff --git a/lib/ckd_web/live/home_live.ex b/lib/ckd_web/live/home_live.ex index ef9f75e..99e45ff 100644 --- a/lib/ckd_web/live/home_live.ex +++ b/lib/ckd_web/live/home_live.ex @@ -33,8 +33,7 @@ defmodule CkdWeb.HomeLive do Ckd.State.read(:ckd_state) _root_pid -> - {:ok, ckd_state} = Ckd.State.subscribe_info(:ckd_state) - ckd_state + Ckd.State.subscribe_info(:ckd_state) end {:ok, diff --git a/lib/ckd_web/live/home_live.html.leex b/lib/ckd_web/live/home_live.html.leex index 87cdec2..b938968 100644 --- a/lib/ckd_web/live/home_live.html.leex +++ b/lib/ckd_web/live/home_live.html.leex @@ -113,6 +113,17 @@
<%= account.display_name %>