diff --git a/README.md b/README.md index 9ee896a..dcdf16b 100644 --- a/README.md +++ b/README.md @@ -1,32 +1,20 @@ -[![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgithub.com%2Fpatchlevel%2Fworker%2F1.5.x)](https://dashboard.stryker-mutator.io/reports/github.com/patchlevel/worker/1.5.x) +[![Mutation testing badge](https://img.shields.io/endpoint?style=flat&url=https%3A%2F%2Fbadge-api.stryker-mutator.io%2Fgithub.com%2Fpatchlevel%2Fworker%2F1.6.x)](https://dashboard.stryker-mutator.io/reports/github.com/patchlevel/worker/1.6.x) [![Latest Stable Version](https://poser.pugx.org/patchlevel/worker/v)](//packagist.org/packages/patchlevel/worker) [![License](https://poser.pugx.org/patchlevel/worker/license)](//packagist.org/packages/patchlevel/worker) # Worker -A small library to build stable, long-running workers that terminate properly when limits are exceeded. -A worker executes a job in a loop and stops gracefully when a run, memory or time limit is reached or -a `SIGTERM` signal is received. This makes it a good fit for daemonized console commands managed by -supervisor, systemd, Docker or Kubernetes, where you want processes to restart cleanly instead of being killed mid-job. - -It was originally part of the [event-sourcing](https://github.com/patchlevel/event-sourcing) library -and has been extracted as a standalone library. +A small library to build stable, long-running workers that terminate gracefully when limits are exceeded +or a SIGTERM signal is received. Perfect for daemonized console commands running under +Docker, Kubernetes, supervisor or systemd, where the process manager restarts the worker after it exits. ## Features -* Graceful shutdown: the worker always finishes the current job before stopping -* Stop the worker after a maximum number of iterations -* Stop the worker when a memory limit is exceeded -* Stop the worker after a time limit -* Stop the worker on `SIGTERM` (e.g. sent by supervisor, Docker or Kubernetes) -* Configurable sleep time between iterations -* PSR-3 logging and Symfony event dispatcher integration - -## Requirements - -* PHP 8.2+ -* [symfony/event-dispatcher](https://github.com/symfony/event-dispatcher) -* The [pcntl](https://www.php.net/manual/en/book.pcntl.php) extension (optional, needed for `SIGTERM` handling) +* Configurable run, memory and time [limits](https://patchlevel.dev/docs/worker/latest/getting-started#limits) +* [Graceful shutdown](https://patchlevel.dev/docs/worker/latest/getting-started#graceful-shutdown-on-sigterm) on SIGTERM +* Extensible via [events and custom listeners](https://patchlevel.dev/docs/worker/latest/events) +* [PSR-3 logging](https://patchlevel.dev/docs/worker/latest/getting-started#logging) of the worker lifecycle +* Plays well with [Symfony and Laravel console commands](https://patchlevel.dev/docs/worker/latest/integration) ## Installation @@ -34,232 +22,7 @@ and has been extracted as a standalone library. composer require patchlevel/worker ``` -## Usage - -The easiest way to create a worker is the `DefaultWorker::create` factory method. -It takes the job as a closure, an array of options and optionally a PSR-3 logger: - -```php -use Patchlevel\Worker\DefaultWorker; - -$worker = DefaultWorker::create( - function (Closure $stop): void { - // do something, e.g. consume a message from a queue - - if (/* some condition */) { - $stop(); // stop the worker from inside the job - } - }, - [ - 'runLimit' => 100, // stop after 100 iterations - 'memoryLimit' => '512MB', // stop if memory usage exceeds 512MB - 'timeLimit' => 3600, // stop after one hour (in seconds) - ], -); - -$worker->run(1000); // sleep up to 1000ms between iterations -``` - -The job receives a `$stop` closure as its first argument. Calling it stops the worker -after the current iteration — useful when the job itself detects that there is nothing left to do. - -### Options - -All options are optional. If an option is not set, the corresponding limit is not enforced. - -| Option | Type | Description | -|---------------|----------------|------------------------------------------------------------------------------| -| `runLimit` | `positive-int` | Stop the worker after this number of job runs | -| `memoryLimit` | `string` | Stop the worker if memory usage exceeds this limit (e.g. `128MB`, `1GB`) | -| `timeLimit` | `positive-int` | Stop the worker after this number of seconds | - -The `memoryLimit` string consists of a number and a unit. Allowed units are `B`, `KB`, `MB` and `GB` -(case-insensitive). If no unit is given, bytes are assumed. - -> [!NOTE] -> Limits are checked *after* each job run. The current job is always finished before the worker stops, -> so the actual runtime or memory usage may exceed the configured limit slightly. - -### Sleep timer - -The `run` method accepts a sleep timer in milliseconds (default: `1000`): - -```php -$worker->run(500); -``` - -The time the job took is subtracted from the sleep timer. If a job run takes longer than -the sleep timer, the next run starts immediately. Pass `0` to disable sleeping entirely. - -### Stopping the worker - -There are several ways the worker can be stopped: - -* Calling the `$stop` closure inside the job -* Calling `$worker->stop()` from outside (e.g. from an event listener) -* Reaching one of the configured limits (`runLimit`, `memoryLimit`, `timeLimit`) -* Receiving a `SIGTERM` signal (requires the `pcntl` extension) - -In all cases the worker finishes the current job and then exits the loop gracefully. - -### Logging - -`DefaultWorker::create` accepts any PSR-3 logger as the third argument. -The worker logs each iteration at `debug` level and the reason for stopping at `info` level: - -```php -use Patchlevel\Worker\DefaultWorker; -use Symfony\Component\Console\Logger\ConsoleLogger; - -$logger = new ConsoleLogger($output); - -$worker = DefaultWorker::create( - function (): void { - // do something - }, - ['runLimit' => 100], - $logger, -); -``` +## Documentation -## Events - -The worker dispatches the following events via the Symfony event dispatcher. -Each event exposes the worker instance as a public readonly property `$worker`, -so listeners can call `$event->worker->stop()`. - -| Event | Dispatched | -|----------------------|------------------------------------------------------------| -| `WorkerStartedEvent` | Once, before the first job run | -| `WorkerRunningEvent` | After every job run | -| `WorkerStoppedEvent` | Once, after the worker loop has finished | - -All events live in the `Patchlevel\Worker\Event` namespace. - -### Custom event listeners - -You can pass your own event dispatcher as the fourth argument of `DefaultWorker::create` -and register custom listeners or subscribers on it: - -```php -use Patchlevel\Worker\DefaultWorker; -use Patchlevel\Worker\Event\WorkerRunningEvent; -use Symfony\Component\EventDispatcher\EventDispatcher; - -$eventDispatcher = new EventDispatcher(); -$eventDispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event): void { - if (/* some condition, e.g. a new deployment was detected */) { - $event->worker->stop(); - } -}); - -$worker = DefaultWorker::create( - function (): void { - // do something - }, - [], - eventDispatcher: $eventDispatcher, -); -``` - -## Listeners - -Internally, the limits are implemented as event subscribers in the `Patchlevel\Worker\Listener` namespace. -`DefaultWorker::create` registers them automatically based on the given options, -but you can also use them directly with your own event dispatcher: - -| Listener | Registered by | Description | -|---------------------------------------|------------------------------|---------------------------------------------------| -| `StopWorkerOnIterationLimitListener` | `runLimit` option | Stops the worker after N job runs | -| `StopWorkerOnMemoryLimitListener` | `memoryLimit` option | Stops the worker when memory usage exceeds limit | -| `StopWorkerOnTimeLimitListener` | `timeLimit` option | Stops the worker after N seconds | -| `StopWorkerOnSigtermSignalListener` | always (if pcntl is loaded) | Stops the worker on `SIGTERM` | - -## Full example: Symfony console command - -A typical use case is a long-running console command where the limits are configurable via CLI options: - -```php -addOption( - 'run-limit', - null, - InputOption::VALUE_OPTIONAL, - 'The maximum number of runs this command should execute', - 1 - ) - ->addOption( - 'memory-limit', - null, - InputOption::VALUE_REQUIRED, - 'How much memory consumption should the worker be terminated (500MB, 1GB, etc.)' - ) - ->addOption( - 'time-limit', - null, - InputOption::VALUE_REQUIRED, - 'What is the maximum time the worker can run in seconds' - ) - ->addOption( - 'sleep', - null, - InputOption::VALUE_REQUIRED, - 'How much time should elapse before the next job is executed in milliseconds', - 1000 - ); - } - - protected function execute(InputInterface $input, OutputInterface $output): int - { - $logger = new ConsoleLogger($output); - - $worker = DefaultWorker::create( - function ($stop): void { - // do something - - if (/* some condition */) { - $stop(); - } - }, - [ - 'runLimit' => $input->getOption('run-limit'), - 'memoryLimit' => $input->getOption('memory-limit'), - 'timeLimit' => $input->getOption('time-limit'), - ], - $logger - ); - - $worker->run((int)$input->getOption('sleep')); - - return 0; - } -} -``` - -The command can then be run like this: - -```bash -bin/console app:worker --run-limit=100 --memory-limit=512MB --time-limit=3600 --sleep=1000 -``` +* [Documentation](https://patchlevel.dev/docs/worker/latest) +* Related [Blog](https://patchlevel.dev/blog) diff --git a/docs/events.md b/docs/events.md new file mode 100644 index 0000000..8c68a5e --- /dev/null +++ b/docs/events.md @@ -0,0 +1,52 @@ +# Events & Listeners + +Internally the worker is built on the symfony event dispatcher. It dispatches three events, +each carrying the worker instance: + +| Event | Dispatched | +|----------------------|-------------------------------------| +| `WorkerStartedEvent` | once, before the first iteration | +| `WorkerRunningEvent` | after every iteration | +| `WorkerStoppedEvent` | once, after the worker has stopped | + +All [limits](getting-started.md#limits) are implemented as event subscribers +(`StopWorkerOnIterationLimitListener`, `StopWorkerOnMemoryLimitListener`, +`StopWorkerOnTimeLimitListener`, `StopWorkerOnSigtermSignalListener`), +so you can add your own stop conditions the same way: + +```php +use Patchlevel\Worker\Event\WorkerRunningEvent; +use Symfony\Component\EventDispatcher\EventSubscriberInterface; + +final class StopWorkerOnNewDeploymentListener implements EventSubscriberInterface +{ + public function onWorkerRunning(WorkerRunningEvent $event): void + { + if (/* new version deployed */) { + $event->worker->stop(); + } + } + + public static function getSubscribedEvents(): array + { + return [WorkerRunningEvent::class => 'onWorkerRunning']; + } +} +``` + +Pass your own event dispatcher to `create` to register additional listeners: + +```php +use Patchlevel\Worker\DefaultWorker; +use Symfony\Component\EventDispatcher\EventDispatcher; + +$eventDispatcher = new EventDispatcher(); +$eventDispatcher->addSubscriber(new StopWorkerOnNewDeploymentListener()); + +$worker = DefaultWorker::create( + $job, + ['timeLimit' => 3600], + $logger, + $eventDispatcher, +); +``` diff --git a/docs/getting-started.md b/docs/getting-started.md new file mode 100644 index 0000000..c33c58e --- /dev/null +++ b/docs/getting-started.md @@ -0,0 +1,86 @@ +# Getting Started + +The easiest way to create a worker is the `DefaultWorker::create` factory. +It takes the job to execute, an array of limit options and an optional PSR-3 logger: + +```php +use Patchlevel\Worker\DefaultWorker; + +$worker = DefaultWorker::create( + function (callable $stop): void { + // do a unit of work + + if (/* nothing left to do */) { + $stop(); + } + }, + [ + 'runLimit' => 100, + 'memoryLimit' => '512MB', + 'timeLimit' => 3600, + ], + $logger, // optional, any PSR-3 logger +); + +$worker->run(); +``` + +The job is executed in a loop until the worker is stopped. +The job receives a `$stop` callback: calling it tells the worker to exit the loop +after the current iteration has finished. The worker never aborts a running job — +stopping always happens *between* iterations, so your job is never interrupted halfway. + +## Limits + +All options are optional. Without limits the worker runs until it is stopped +via `$stop()`, `$worker->stop()` or a SIGTERM signal. + +| Option | Type | Description | +|---------------|----------|---------------------------------------------------------------------------------------------------------------------------------| +| `runLimit` | `int` | Stop after this number of iterations. | +| `memoryLimit` | `string` | Stop when memory usage exceeds this value, e.g. `128MB`. Supported units: `B`, `KB`, `MB`, `GB` (case-insensitive, 1024-based). | +| `timeLimit` | `int` | Stop after this number of seconds. | + +Limits are checked after each iteration. When a limit is exceeded, the worker logs the reason and stops gracefully. + +:::warning +An invalid `memoryLimit` string throws a `Patchlevel\Worker\InvalidFormat` exception. +::: + +:::note +Internally every limit is implemented as an event listener. +You can add your own stop conditions the same way, see [events & listeners](events.md). +::: + +## Graceful shutdown on SIGTERM + +If the `pcntl` extension is available, the worker automatically registers a SIGTERM handler. +When the process receives SIGTERM (e.g. from `docker stop`, a Kubernetes pod shutdown or supervisor), +the worker finishes the current iteration and then exits cleanly. + +This makes the worker a good fit for process managers that send SIGTERM and restart the process, +e.g. to roll out a new version or to keep long-running processes fresh. + +:::warning +Without `ext-pcntl` this feature is not available. +::: + +## Sleep + +`run()` takes a sleep timer in milliseconds (default: `1000`): + +```php +$worker->run(500); // aim for one iteration every 500ms +``` + +The job's own run time is subtracted from the sleep: if the job took 300ms and the sleep timer +is 500ms, the worker only sleeps 200ms. If the job took longer than the sleep timer, +the next iteration starts immediately. Pass `0` to disable sleeping entirely. + +## Logging + +The worker logs its lifecycle (start, iteration timings, sleep, stop reason) to the given PSR-3 logger. +Iteration details use the `debug` level; stop reasons (limit exceeded, SIGTERM received) use `info`. + +With the `ConsoleLogger` from the [Symfony command example](integration.md#symfony), run the command with `-v` +to see stop reasons or `-vvv` to see everything. diff --git a/docs/integration.md b/docs/integration.md new file mode 100644 index 0000000..b415c56 --- /dev/null +++ b/docs/integration.md @@ -0,0 +1,127 @@ +# Integration + +A typical setup is a console command that exposes the [limits](getting-started.md#limits) as options, +so they can be configured per environment. + +## Symfony + +```php + $runLimit, + 'memoryLimit' => $memoryLimit, + 'timeLimit' => $timeLimit, + ], + $logger + ); + + $worker->run($sleep); + + return Command::SUCCESS; + } +} +``` + +Run it with the limits suited to your deployment: + +```bash +bin/console app:worker --time-limit=3600 --memory-limit=512MB -v +``` + +## Laravel + +```php +option('run-limit'); + $timeLimit = $this->option('time-limit'); + + $worker = DefaultWorker::create( + function (callable $stop): void { + // do something + + if (/* some condition */) { + $stop(); + } + }, + [ + 'runLimit' => $runLimit !== null ? (int) $runLimit : null, + 'memoryLimit' => $this->option('memory-limit'), + 'timeLimit' => $timeLimit !== null ? (int) $timeLimit : null, + ], + $logger, + ); + + $worker->run((int) $this->option('sleep')); + + return self::SUCCESS; + } +} +``` + +Run it with the limits suited to your deployment: + +```bash +php artisan app:worker --time-limit=3600 --memory-limit=512MB +``` + +:::note +The injected `LoggerInterface` writes to Laravel's default log channel. +Use a dedicated channel if you want the worker output separated from the rest of your application logs. +::: diff --git a/docs/introduction.md b/docs/introduction.md new file mode 100644 index 0000000..d4dfdfd --- /dev/null +++ b/docs/introduction.md @@ -0,0 +1,25 @@ +# Worker + +A small library to build stable, long-running workers that terminate gracefully when limits are exceeded +or a SIGTERM signal is received. Perfect for daemonized console commands running under +Docker, Kubernetes, supervisor or systemd, where the process manager restarts the worker after it exits. + +It was extracted from the [event-sourcing](https://github.com/patchlevel/event-sourcing) library into a separate package. + +## Features + +* Configurable run, memory and time [limits](getting-started.md#limits) +* [Graceful shutdown](getting-started.md#graceful-shutdown-on-sigterm) on SIGTERM +* Extensible via [events and custom listeners](events.md) +* [PSR-3 logging](getting-started.md#logging) of the worker lifecycle +* Plays well with [Symfony and Laravel console commands](integration.md) + +## Installation + +```bash +composer require patchlevel/worker +``` + +:::tip +Start with the [getting started](getting-started.md) guide to get a feeling for the library. +::: diff --git a/docs/project.json b/docs/project.json new file mode 100644 index 0000000..8575866 --- /dev/null +++ b/docs/project.json @@ -0,0 +1,20 @@ +{ + "navigation": [ + { + "title": "Introduction", + "file": "introduction.md" + }, + { + "title": "Getting Started", + "file": "getting-started.md" + }, + { + "title": "Events & Listeners", + "file": "events.md" + }, + { + "title": "Integration", + "file": "integration.md" + } + ] +}