Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ operators/gdocs
assets/_private
examples/codegen
.parcel-cache
.claude/settings.local.json
2 changes: 1 addition & 1 deletion effect/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@rxfx/effect",
"version": "1.1.4",
"version": "1.1.6",
"license": "MIT",
"author": "Dean Radcliffe",
"repository": "https://github.com/deanrad/rxfx",
Expand Down
126 changes: 110 additions & 16 deletions effect/src/createEffect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ import {
takeUntil,
tap,
} from 'rxjs/operators';
import { EffectRunner, EffectSource } from './types';
import {
EffectRunner,
EffectSource,
LifecycleReducerEvent,
ProcessLifecycleCallbacks,
} from './types';

const allShutdowns = new Subject<void>();

Expand All @@ -41,16 +46,22 @@ export function shutdownAll() {
* @param handler The Promise or Observable-returning effect function - the EffectSource
* @param concurrencyOperator The concurrency-control function (defaults to `mergeMap` aka Immediate)
* @summary ![immediate mode](https://d2jksv3bi9fv68.cloudfront.net/rxfx/mode-immediate-sm.png) */
export function createEffect<Request, Response = void, TError = Error>(
export function createEffect<
Request,
Response = void,
TError extends Error = Error,
TState = Response
>(
handler: EffectSource<Request, Response>,
concurrencyOperator = mergeMap
): EffectRunner<Request, Response, TError> {
): EffectRunner<Request, Response, TError, TState> {
const errors = new Subject<TError>();

const currentError = new BehaviorSubject<TError | null>(null);
const lastResponse = new BehaviorSubject<Response | null>(null);
const state = new BehaviorSubject(null);
const state = new BehaviorSubject<TState | null>(null);

const incoming = new Subject<Request>();
const requests = new Subject<Request>();
const responses = new Subject<Response>();
const starts = new Subject<Request>();
Expand Down Expand Up @@ -160,6 +171,7 @@ export function createEffect<Request, Response = void, TError = Error>(
const executor = function Effect(req: Request) {
handlings.next(
new Observable((notify) => {
incoming.next(req);
requests.next(req);
notify.complete();
})
Expand Down Expand Up @@ -200,6 +212,56 @@ export function createEffect<Request, Response = void, TError = Error>(
isHandling,
isActive,
state,

reduceWith(
reducer: (
state: TState,
evt: LifecycleReducerEvent<Request, Response, Error>
) => TState,
initial: TState
) {
const events$ = merge(
incoming.pipe(
map((payload) => ({ type: 'request' as const, payload }))
),
starts.pipe(map((payload) => ({ type: 'started' as const, payload }))),
responses.pipe(
map((payload) => ({ type: 'response' as const, payload }))
),
completions.pipe(
map((payload) => ({ type: 'complete' as const, payload }))
),
cancelations.pipe(
map((payload) => ({ type: 'canceled' as const, payload }))
)
);
state.next(initial);
mainSub.add(events$.pipe(scan(reducer, initial)).subscribe(state));
return state as BehaviorSubject<TState>;
},
observe(fns: Partial<ProcessLifecycleCallbacks<Request, Response, Error>>) {
const streams = [
tap(fns.request ? fns.request : noop)(requests),
tap(fns.started ? fns.started : noop)(starts),
tap(fns.response ? fns.response : noop)(responses),
tap(fns.complete ? fns.complete : noop)(completions),
tap(fns.canceled ? fns.canceled : noop)(cancelations),
tap(fns.error ? fns.error : noop)(errors),
];
const allEvents = merge(...streams).pipe(
tap({
finalize: fns.finalized,
})
);
const sub = allEvents.subscribe();
sub.add(
merge(completions, cancelations)
.pipe(tap(fns.finalized ?? noop))
.subscribe()
);

return sub;
},
};

// The first batch starts us listening
Expand All @@ -211,38 +273,58 @@ export function createEffect<Request, Response = void, TError = Error>(
/** Creates an Effect - A higher-order wrapper around a Promise-or-Observable returning function.
* The effect is cancelable if it returns an Observable. `createQueueingEffect` runs in concurrency mode: "queueing" aka `concatMap`.
* @summary ![queueing mode](https://d2jksv3bi9fv68.cloudfront.net/rxfx/mode-queueing-sm.png) */
export function createQueueingEffect<Request, Response>(
export function createQueueingEffect<
Request,
Response = void,
TError extends Error = Error,
TState = Response
>(
handler: EffectSource<Request, Response>
) {
): EffectRunner<Request, Response, TError, TState> {
return createEffect(handler, concatMap);
}

/** Creates an Effect - A higher-order wrapper around a Promise-or-Observable returning function.
* The effect is cancelable if it returns an Observable. `createSwitchingEffect` runs in concurrency mode: "switching" aka `switchMap`.
* @summary ![switching mode](https://d2jksv3bi9fv68.cloudfront.net/rxfx/mode-switching-sm.png) */
export function createSwitchingEffect<Request, Response>(
export function createSwitchingEffect<
Request,
Response = void,
TError extends Error = Error,
TState = Response
>(
handler: EffectSource<Request, Response>
) {
): EffectRunner<Request, Response, TError, TState> {
return createEffect(handler, switchMap);
}

/** Creates an Effect - A higher-order wrapper around a Promise-or-Observable returning function.
* The effect is cancelable if it returns an Observable. `createBlockingEffect` runs in concurrency mode: "blocking" aka `exhaustMap`.
* @summary ![blocking mode](https://d2jksv3bi9fv68.cloudfront.net/rxfx/mode-blocking-sm.png)
*/
export function createBlockingEffect<Request, Response>(
export function createBlockingEffect<
Request,
Response = void,
TError extends Error = Error,
TState = Response
>(
handler: EffectSource<Request, Response>
) {
): EffectRunner<Request, Response, TError, TState> {
return createEffect(handler, exhaustMap);
}

/** Creates an Effect - A higher-order wrapper around a Promise-or-Observable returning function.
* The effect is cancelable if it returns an Observable. `createTogglingEffect` runs in concurrency mode: "blocking" aka `exhaustMap`.
* @summary ![toggling mode](https://d2jksv3bi9fv68.cloudfront.net/rxfx/mode-toggling-sm.png)
*/
export function createTogglingEffect<Request, Response>(
export function createTogglingEffect<
Request,
Response = void,
TError extends Error = Error,
TState = Response
>(
handler: EffectSource<Request, Response>
) {
): EffectRunner<Request, Response, TError, TState> {
return createEffect(handler, toggleMap as typeof mergeMap);
}

Expand All @@ -257,9 +339,14 @@ export const DEFAULT_DEBOUNCE_INTERVAL = 330;
export function createThrottledEffect(
msec: number = DEFAULT_DEBOUNCE_INTERVAL
) {
return function <Request, Response = void>(
return function <
Request,
Response = void,
TError extends Error = Error,
TState = Response
>(
handler: EffectSource<Request, Response>
) {
): EffectRunner<Request, Response, TError, TState> {
return createEffect((args: Request) => {
return concat(
// do the work up front
Expand All @@ -279,9 +366,14 @@ export function createThrottledEffect(
export function createDebouncedEffect(
msec: number = DEFAULT_DEBOUNCE_INTERVAL
) {
return function <Request, Response = void>(
return function <
Request,
Response = void,
TError extends Error = Error,
TState = Response
>(
handler: EffectSource<Request, Response>
) {
): EffectRunner<Request, Response, TError, TState> {
return createEffect((args: Request) => {
return concat(
// wait initially
Expand All @@ -301,3 +393,5 @@ export function createCustomEffect<Request, Response>(
) {
return createEffect(handler, concurrencyOperator);
}

function noop() {}
8 changes: 8 additions & 0 deletions effect/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
export { concat, EMPTY, throwError } from 'rxjs';
export * from './createEffect';
// Concurrency mode operator aliases
export {
mergeMap as immediate,
concatMap as queueing,
exhaustMap as blocking,
switchMap as switching,
switchMap as replacing,
} from 'rxjs/operators';
65 changes: 56 additions & 9 deletions effect/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import { BehaviorSubject, Observable, ObservableInput } from 'rxjs';
import {
BehaviorSubject,
Observable,
ObservableInput,
Subscription,
} from 'rxjs';

/** An EffectSource is an async function, or a function with a Promise, Observable, or Iterable return value,
* whose lifecycle events will be exposed to the EffectRunner.
Expand All @@ -25,36 +30,78 @@ export interface Cancelable {

/** The Stateful interface represents information the EffectRunner retains about previous or current effect executions.
*/
export interface Stateful<Response, TError> {
export interface Stateful<Response, TError, TState> {
lastResponse: BehaviorSubject<Response | null>;
currentError: BehaviorSubject<TError | null>;
isHandling: BehaviorSubject<boolean>;
isActive: BehaviorSubject<boolean>;
/** Reserved for future use. */
state: BehaviorSubject<null>;
state: BehaviorSubject<TState | null>;
}

/** The Events interface contains Observables that an effect triggerer may use to get updates on executions. */
export interface Events<Request, Response, TError> {
errors: Observable<TError>;
responses: Observable<Response>;
starts: Observable<Request>;
responses: Observable<Response>;
errors: Observable<TError>;
completions: Observable<Request>;
cancelations: Observable<Request>;
}

/**
* An EffectRunner is a function, enhanced with Observable properties
*/
export interface EffectRunner<Request, Response, TError = Error>
extends EffectTriggerer<Request>,
export interface EffectRunner<
Request,
Response,
TError extends Error = Error,
TState = Response
> extends EffectTriggerer<Request>,
Cancelable,
Events<Request, Response, TError>,
Stateful<Response, TError> {
Stateful<Response, TError, TState> {
request: (req: Request) => void;

send: (
req: Request,
matcher?: (req: Request, res: Response) => boolean
) => Promise<Response>;

/** Populates #state via the reducer. Meant to be called only once, before */
reduceWith: (
reducer: (
state: TState,
evt: LifecycleReducerEvent<Request, Response, Error>
) => TState,
initial: TState
) => BehaviorSubject<TState>;

observe(
callbacks: Partial<ProcessLifecycleCallbacks<Request, Response, Error>>
): Subscription;
}

export type LifecycleReducerEvent<Req, Res, Err> =
| { type: 'request'; payload: Req }
| { type: 'started'; payload: Req }
| { type: 'response'; payload: Res }
| { type: 'complete'; payload?: Req }
| { type: 'error'; payload: Err }
| { type: 'canceled'; payload?: Req };

/** Callbacks corresponding to lifecycle events of a process. */
export interface ProcessLifecycleCallbacks<TRequest, TNext, TError = Error> {
/** invokes the effects */
request: (r: TRequest) => void;
/** an invocation has begun */
started: (r: TRequest) => void;
/** an invocation has produced data */
response: (next: TNext) => void;
/** an invocation has terminated with an error */
error: (err: TError) => void;
/** an invocation has terminated successfully */
complete: (r: TRequest) => void;
/** an invocation was canceled by a subscriber */
canceled: (r: TRequest) => void;
/** an invocation concluded, in any fashion */
finalized: () => void;
}
Loading