Module riak_pipe_vnode_worker

Basic worker process implementation, to be parameterized with fitting implementation module.

Behaviours: gen_fsm.

This module defines the riak_pipe_vnode_worker behaviour.
Required callback functions: init/2, process/3, done/1.

Description

Basic worker process implementation, to be parameterized with fitting implementation module.

Modules that implement this behavior need to export at least three functions:

  init(Partition :: riak_pipe_vnode:partition(),
       FittingDetails :: riak_pipe_fitting:details())
    -> {ok, ModuleState :: term()}

The init/2 function is called when the worker starts. The module should do whatever it needs to get ready before processing inputs. The module will probably also want to store the Parition and FittingDetails arguments in its state, as it will need them to send outputs later. The ModuleState returned from this function will be passed to the process/3 function later.

  process(Input :: term(),
          LastInPreflist :: boolean(),
          ModuleState :: term())
    -> {ok, NewModuleState :: term()}
      |forward_preflist.

The process/3 function is called once for each input delivered to the worker. The module should do whatever processing is needed, sending outputs if appropriate. The NewModuleState returned from process/3 will be passed back in on the next input.

LastInPreflist is an indicator as to whether this worker is the last in the partition preflist for this input. When this parameter is false, the function may return the atom forward_preflist to have the input sent to the next vnode in the prefence list. When is parameter is true, returning forward_preflist will cause an error trace message to be generated, with the reason preflist_exhausted.

  done(ModuleState :: term()) -> ok.

The done/1 function is called when all inputs have been processed, and the end-of-inputs flag has been received from the fitting. The module should clean up any resources it needs to clean up, and send any outputs it still needs to send. When done/1 returns, the worker will terminate.

There are also four optional functions that a worker behavior module can export:

  validate_arg(Arg :: term()) -> ok | {error, Reason :: iolist()}.

The validate_arg/1 function is called before a pipeline is constructed. If the behavior module exports this function, then it will be evaluated on the value of the arg field of a #fitting_spec{} record that points to this module. If the argument is valid, this function should return the atom ok. If the argument is invalid, the function should return an error tuple, with the Reason being a printable iolist.

  archive(ModuleState :: term()) -> {ok, Archive :: term()}.

The archive/1 function is called when the vnode that owns this worker is being handed off to another node. The worker should produce some erlang term that represents its state. This Archive term will be passed to the handoff/2 function of the module, by the worker running on the handoff target.

  handoff(Archive :: term(),
          ModuleState :: term()) ->
     {ok, NewModuleState :: term()}.

The handoff/2 function is called when a vnode receives a handoff archive from another vnode. The module should "merge" the Archive with its ModuleState (in whatever sense "merge" may mean for this fitting), and return the resulting NewModuleState.

  no_input_run_reduce_once() -> boolean().

If present and returns true, then in the case that a fitting has no input (as measured by having zero workers), then a "fake" worker will be started for the express purpose of running its computation once and sending some output downstream. Right now, the only fitting that needs this feature is riak_kv_w_reduce.erl, which needs the capability to run its reduce function once (with input of an empty list) in order to maintain full compatibility with Riak KV's Map/Reduce.

For riak_kv_w_reduce.erl and any other pipe behavior callback module where this function returns true, the #fitting_details.options property list will contain the property pipe_fitting_no_input to indicate that the fitting has no input.

Data Types

state()

abstract datatype: state()

Function Index

behaviour_info/1Get information about this behavior.
code_change/4Unused.
handle_event/3Unused.
handle_info/3Unused.
handle_sync_event/4Unused.
init/1Initialize the worker.
initial_input_request/2The worker has just started, and should request its first input from its owning vnode.
recurse_input/3Equivalent to recurse_input(Input, FromPartition, Details, noblock).
recurse_input/4Send a new input from this fitting, to itself.
send_archive/1Ask the worker to archive itself.
send_handoff/2Ask the worker to merge handoff data from an archived worker.
send_input/2Send input to the worker.
send_output/3Equivalent to send_output(Output, FromPartition, Details, infinity).
send_output/4Send output from the given fitting to the next output down the line.
send_output/5Send output from the given fitting to a specific fitting.
start_link/3Start a worker for the specified fitting+vnode.
terminate/3Unused.
wait_for_input/2The worker has requested its next input item, and is waiting for it.

Function Details

behaviour_info/1

behaviour_info(Other::atom()) -> undefined | [{atom(), arity()}]

Get information about this behavior.

code_change/4

code_change(OldVsn::term(), StateName::atom(), State::state(), Extra::term()) -> {ok, atom(), state()}

Unused.

handle_event/3

handle_event(Event::term(), StateName::atom(), State::state()) -> {next_state, atom(), state()}

Unused.

handle_info/3

handle_info(Info::term(), StateName::atom(), State::state()) -> {next_state, atom(), state()}

Unused.

handle_sync_event/4

handle_sync_event(Event::term(), From::term(), StateName::atom(), State::state()) -> {reply, ok, atom(), state()}

Unused.

init/1

init(X1::[riak_pipe_vnode:partition() | pid() | riak_pipe_fitting:details()]) -> {ok, initial_input_request, state(), 0} | {stop, {init_failed, term(), term()}}

Initialize the worker. This function calls the implementing module's init function. If that init function fails, the worker stops with an {init_failed, Type, Error} reason.

initial_input_request/2

initial_input_request(X1::timeout, State::state()) -> {next_state, wait_for_input, state()}

The worker has just started, and should request its first input from its owning vnode. This is done after a zero timeout instead of in the init function to get around the deadlock that would result from having the worker wait for a message from the vnode, which is waiting for a response from this process.

recurse_input/3

recurse_input(Input, FromPartition, Details) -> any()

Equivalent to recurse_input(Input, FromPartition, Details, noblock).

recurse_input/4

recurse_input(Input::term(), FromPartition::riak_pipe_vnode:partition(), Details::riak_pipe_fitting:details(), Timeout::riak_pipe_vnode:qtimeout()) -> ok | {error, term()}

Send a new input from this fitting, to itself. This can be used to write fittings that perform recursive calculation, where steps in the recursion might be done in parallel.

For example, when walking intermediate tree nodes, using recurse_input/3 to send children to other vnodes, instead of processing them in the same worker, may be a useful strategy.

WARNING: Using recurse_input with a Timeout of infinity is discouraged, unless you can guarantee that the queues for a fitting will never be full. Otherwise, it's possible to deadlock a fitting by blocking on enqueueing an input for a worker that is blocking on enqueueing an input for the sender (circular blocking). Use noblock and handle timeout failures to prevent deadlock.

Internal details: This works because of the nature of the blocking enqueue operation. It is guaranteed that as long as this worker is alive, the fitting for which it works will not receive all of its done messages. So, the vnode that enqueues this input will still be able to ask the fitting for details, and the fitting will know that it has to wait on that vnode.

send_archive/1

send_archive(WorkerPid::pid()) -> ok

Ask the worker to archive itself. The worker will send the archive data to the owning vnode when it has done so. Once it has sent the archive, the worker shuts down normally.

send_handoff/2

send_handoff(WorkerPid::pid(), Archive::term()) -> ok

Ask the worker to merge handoff data from an archived worker. Note: this should only be called by the vnode that owns the worker, as the result of the worker asking for its next input when the vnode has received handoff data for the worker's fitting.

send_input/2

send_input(WorkerPid::pid(), Input::done | {term(), riak_core_apl:preflist()}) -> ok

Send input to the worker. Note: this should only be called by the vnode that owns the worker, as the result of the worker asking for its next input.

send_output/3

send_output(Output, FromPartition, Details) -> any()

Equivalent to send_output(Output, FromPartition, Details, infinity).

send_output/4

send_output(Output::term(), FromPartition::riak_pipe_vnode:partition(), Fitting_details::riak_pipe_fitting:details(), Timeout::riak_pipe_vnode:qtimeout()) -> ok | {error, term()}

Send output from the given fitting to the next output down the line. FromPartition is used in the case that the next fitting's partition function is follow.

send_output/5

send_output(Output::term(), FromPartition::riak_pipe_vnode:partition(), Details::riak_pipe_fitting:details(), FittingOverride::riak_pipe:fitting(), Timeout::riak_pipe_vnode:qtimeout()) -> ok | {error, term()}

Send output from the given fitting to a specific fitting. This is most often used to send output to the sink, but also happens to be the internal implementation of send_output/3.

start_link/3

start_link(Partition::riak_pipe_vnode:partition(), VnodePid::pid(), FittingDetails::riak_pipe_fitting:details()) -> {ok, pid()} | ignore | {error, term()}

Start a worker for the specified fitting+vnode.

terminate/3

terminate(Reason::term(), StateName::atom(), State::state()) -> ok

Unused.

wait_for_input/2

wait_for_input(X1::{input, done | {term(), riak_core_apl:preflist()}} | {handoff, term()} | archive, State::state()) -> {next_state, wait_for_input, state()} | {stop, normal, state()}

The worker has requested its next input item, and is waiting for it.

If the input is done, due to end-of-inputs from the fitting, then the implementing module's done function is evaluated, the the worker terminates normally.

If the input is any regular input, then the implementing module's process function is evaluated. When it finishes, the next input is requested from the vnode.

If the input is a handoff from another vnode, the worker asks the implementing module to merge the archive, if the worker exports that functionality.

If the input is a request to archive, the worker asks the implementing module to archive itself, if the worker exports that functionality. When the archiving process has finished, the worker terminates normally.


Generated by EDoc, Nov 29 2012, 06:39:33.