-
Notifications
You must be signed in to change notification settings - Fork 2
Add versionedjob contrib package with basic versioning framework
#39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
brandur
wants to merge
1
commit into
master
Choose a base branch
from
brandur-versioned-job
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,9 @@ | ||
| go 1.24.2 | ||
| go 1.25.5 | ||
|
|
||
| use ( | ||
| ./datadogriver | ||
| ./otelriver | ||
| ./nilerror | ||
| ./otelriver | ||
| ./panictoerror | ||
| ./versionedjob | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,167 @@ | ||
| # versionedjob [](https://github.com/riverqueue/rivercontrib/actions) [](https://pkg.go.dev/github.com/riverqueue/rivercontrib/versionedjob) | ||
|
|
||
| Provides a River hook with a simple job versioning framework. **Version transformers** are written for versioned jobs containing procedures for upgrading jobs that were encoded as older versions to the most modern version. This allows for workers to be implemented as if all job versions will be the most modern version only, keeping code simpler. | ||
|
|
||
| ```go | ||
| // VersionTransformer defines how to perform transformations between versions | ||
| // for a specific job kind. | ||
| type VersionTransformer interface { | ||
| // Kind is the job kind that this transformer applies to. | ||
| Kind() string | ||
|
|
||
| // VersionTransform applies version transformations to the given job. Version | ||
| // transformations are fully defined according to the user, as well as how a | ||
| // version is extracted from the job's args. | ||
| // | ||
| // Generally, this function should extract a version from the job, then | ||
| // apply versions one by one until it's fully modernized to the point where | ||
| // it can be successfully run by its worker. | ||
| VersionTransform(job *rivertype.JobRow) error | ||
| } | ||
| ``` | ||
|
|
||
| ## Example | ||
|
|
||
| Below are three versions of the same job: `VersionedJobArgsV1`, `VersionedJobArgsV2`, and the current version, `VersionedJobArgs`. From V1 to V2, `name` was renamed to `title`, and a `version` field added to track version. In V3, a new `description` property was added. A real program would only keep the latest version (`VersionedJobArgs`), but this example shows all three for reference. | ||
|
|
||
| ```go | ||
| type VersionedJobArgsV1 struct { | ||
| Name string `json:"name"` | ||
| } | ||
|
|
||
| type VersionedJobArgsV2 struct { | ||
| Title string `json:"title"` | ||
| Version int `json:"version"` | ||
| } | ||
|
|
||
| type VersionedJobArgs struct { | ||
| Description string `json:"description"` | ||
| Title string `json:"title"` | ||
| Version int `json:"version"` | ||
| } | ||
| ``` | ||
|
|
||
| The worker for `VersionedJobArgs` is written so it only handles the latest version (`title` instead of `name` and assumes `description` is present). This is possible because a `VersionTransformer` will handle migrating jobs from old versions to new ones before they hit the worker. | ||
|
|
||
| ```go | ||
| type VersionedJobWorker struct { | ||
| river.WorkerDefaults[VersionedJobArgs] | ||
| } | ||
|
|
||
| func (w *VersionedJobWorker) Work(ctx context.Context, job *river.Job[VersionedJobArgs]) error { | ||
| fmt.Printf("Job title: %s; description: %s\n", job.Args.Title, job.Args.Description) | ||
| return nil | ||
| } | ||
| ``` | ||
|
|
||
| The `VersionTransformer` implementation handles version upgrades one by one. Jobs which are multiple versions old can still be upgraded because multiple version changes can be applied in one go. This implementation uses `gjson`/`sjson` so that each change need only know a minimum about the data object in question and that unknown fields are retained. Other approaches are possible though, including using only Go's built-in `gjson` package. | ||
|
|
||
| ```go | ||
| type VersionedJobTransformer struct{} | ||
|
|
||
| func (*VersionedJobTransformer) Kind() string { return (VersionedJobArgs{}).Kind() } | ||
|
|
||
| func (*VersionedJobTransformer) VersionTransform(ctx context.Context, job *rivertype.JobRow) error { | ||
| // Extract version from job, defaulting to 1 if not present because we | ||
| // assume that was before versioning was introduced. | ||
| version := cmp.Or(gjson.GetBytes(job.EncodedArgs, "version").Int(), 1) | ||
|
|
||
| var err error | ||
|
|
||
| // | ||
| // Here, we walk through each successive version, applying transformations | ||
| // to bring it to its next version. If a job is multiple versions behind, | ||
| // version transformations are one-by-one applied in order until the job's | ||
| // args are fully modernized. | ||
| // | ||
|
|
||
| // Version change: V1 --> V2 | ||
| if version < 2 { | ||
| version = 2 | ||
|
|
||
| job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "title", gjson.GetBytes(job.EncodedArgs, "name").String()) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| job.EncodedArgs, err = sjson.DeleteBytes(job.EncodedArgs, "name") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // Version change: V2 --> V3 | ||
| if version < 3 { | ||
| version = 3 | ||
|
|
||
| title := gjson.GetBytes(job.EncodedArgs, "title").String() | ||
| if title == "" { | ||
| return errors.New("no title found in job args") | ||
| } | ||
|
|
||
| job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "description", "A description of a "+title+".") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // Not strictly necessary, but set version to latest. | ||
| job.EncodedArgs, err = sjson.SetBytes(job.EncodedArgs, "version", version) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
| ``` | ||
|
|
||
| A River client is initialized with the `versiondjob` hook and transformer installed: | ||
|
|
||
| ```go | ||
| riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ | ||
| Hooks: []rivertype.Hook{ | ||
| versionedjob.NewHook(&versionedjob.HookConfig{ | ||
| Transformers: []versionedjob.VersionTransformer{ | ||
| &VersionedJobTransformer{}, | ||
| }, | ||
| }), | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| panic(err) | ||
| } | ||
| ``` | ||
|
|
||
| With all that in place, a job of any version can be inserted and thanks to the version transformer modernizing the older ones, the worker will produce the same result regardless of input. | ||
|
|
||
| ```go | ||
| if _, err = riverClient.InsertMany(ctx, []river.InsertManyParams{ | ||
| { | ||
| Args: VersionedJobArgsV1{ | ||
| Name: "My Job", | ||
| }, | ||
| }, | ||
| { | ||
| Args: VersionedJobArgsV2{ | ||
| Title: "My Job", | ||
| Version: 2, | ||
| }, | ||
| }, | ||
| { | ||
| Args: VersionedJobArgs{ | ||
| Title: "My Job", | ||
| Description: "A description of a My Job.", | ||
| Version: 3, | ||
| }, | ||
| }, | ||
| }); err != nil { | ||
| panic(err) | ||
| } | ||
| ``` | ||
|
|
||
| ```go | ||
| // Output: | ||
| // Job title: My Job; description: A description of a My Job. | ||
| // Job title: My Job; description: A description of a My Job. | ||
| // Job title: My Job; description: A description of a My Job. | ||
| ``` | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| module github.com/riverqueue/rivercontrib/versionedjob | ||
|
|
||
| go 1.25.5 | ||
|
|
||
| require ( | ||
| github.com/jackc/pgx/v5 v5.7.6 | ||
| github.com/riverqueue/river v0.29.0 | ||
| github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0 | ||
| github.com/riverqueue/river/rivershared v0.29.0 | ||
| github.com/riverqueue/river/rivertype v0.29.0 | ||
| github.com/stretchr/testify v1.11.1 | ||
| github.com/tidwall/gjson v1.18.0 | ||
| github.com/tidwall/sjson v1.2.5 | ||
| ) | ||
|
|
||
| require ( | ||
| github.com/davecgh/go-spew v1.1.1 // indirect | ||
| github.com/jackc/pgpassfile v1.0.0 // indirect | ||
| github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect | ||
| github.com/jackc/puddle/v2 v2.2.2 // indirect | ||
| github.com/pmezard/go-difflib v1.0.0 // indirect | ||
| github.com/riverqueue/river/riverdriver v0.29.0 // indirect | ||
| github.com/tidwall/match v1.2.0 // indirect | ||
| github.com/tidwall/pretty v1.2.1 // indirect | ||
| go.uber.org/goleak v1.3.0 // indirect | ||
| golang.org/x/crypto v0.45.0 // indirect | ||
| golang.org/x/sync v0.19.0 // indirect | ||
| golang.org/x/text v0.32.0 // indirect | ||
| gopkg.in/yaml.v3 v3.0.1 // indirect | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
| github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= | ||
| github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= | ||
| github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0= | ||
| github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds= | ||
| github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= | ||
| github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= | ||
| github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= | ||
| github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= | ||
| github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= | ||
| github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= | ||
| github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= | ||
| github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= | ||
| github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= | ||
| github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= | ||
| github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= | ||
| github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= | ||
| github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= | ||
| github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= | ||
| github.com/riverqueue/river v0.29.0 h1:PMO4k6n7HcIjjgrbnG2UG04Exh8aLmQksOddOoYDASA= | ||
| github.com/riverqueue/river v0.29.0/go.mod h1:S8BbQbxCrJLYygmnrnraltHhWlGzZzwjqcRbY3wdq7w= | ||
| github.com/riverqueue/river/riverdriver v0.29.0 h1:o7mV07RPXrGJdwXUKxVTOyvG1/cDmJIMI3V4Le4/LBo= | ||
| github.com/riverqueue/river/riverdriver v0.29.0/go.mod h1:bmkdn74EG4Ogsv44JkC1CBxFZ3JHfYsN+e0K8Dq0otU= | ||
| github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0 h1:l3D17JWq/00QEt0bcawyDMxZYmM1YAk11Y/nRRVk5C8= | ||
| github.com/riverqueue/river/riverdriver/riverpgxv5 v0.29.0/go.mod h1:mpncN3m7DR7VpD78LV5CczbSpwkWcLeJ5j1kkJiOt9s= | ||
| github.com/riverqueue/river/rivershared v0.29.0 h1:Niwbmp/CQAKPZ+zT3teCgEmPhksyW0f2cx4X03FurEk= | ||
| github.com/riverqueue/river/rivershared v0.29.0/go.mod h1:74WjXTYKV4nTfLemIPloPqiA3Tjqe5BFvnALrNbS62k= | ||
| github.com/riverqueue/river/rivertype v0.29.0 h1:26hpzbd44piqJZ+1zO4RO6GRKpmZVX3Ncx+Ki+w2gtg= | ||
| github.com/riverqueue/river/rivertype v0.29.0/go.mod h1:rWpgI59doOWS6zlVocROcwc00fZ1RbzRwsRTU8CDguw= | ||
| github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= | ||
| github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= | ||
| github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= | ||
| github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= | ||
| github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= | ||
| github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= | ||
| github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= | ||
| github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= | ||
| github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= | ||
| github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= | ||
| github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= | ||
| github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= | ||
| github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= | ||
| github.com/tidwall/match v1.2.0 h1:0pt8FlkOwjN2fPt4bIl4BoNxb98gGHN2ObFEDkrfZnM= | ||
| github.com/tidwall/match v1.2.0/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= | ||
| github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= | ||
| github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= | ||
| github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= | ||
| github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= | ||
| github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= | ||
| go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= | ||
| go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= | ||
| golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= | ||
| golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= | ||
| golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= | ||
| golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= | ||
| golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= | ||
| golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= | ||
| gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= | ||
| gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= | ||
| gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= | ||
| gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= | ||
| gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= | ||
| gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you meant
encoding/json?