-
Notifications
You must be signed in to change notification settings - Fork 89
pg_out: add cdc output mode #5320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds CDC (Change Data Capture) mode to the PostgreSQL output connector, allowing users to write all operations as INSERT operations into an append-only event log with metadata columns for operation type and timestamp.
Key changes:
- Added
PostgresWriteModeenum withMaterialized(default) andCdcmodes - Added configuration options
cdc_op_columnandcdc_ts_columnfor CDC metadata columns - Modified SQL query generation to use INSERT for all operations in CDC mode
- Added comprehensive test coverage for CDC mode across all operation types
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| docs.feldera.com/docs/connectors/sinks/postgresql.md | Documents the new CDC mode and configuration options |
| crates/feldera-types/src/transport/postgres.rs | Defines PostgresWriteMode enum and adds CDC configuration fields with validation |
| crates/adapters/src/integrated/postgres/test.rs | Adds CDC test variants and PostgresTestStructCdc type for testing |
| crates/adapters/src/integrated/postgres/prepared_statements.rs | Updates query generation to handle CDC mode INSERT operations |
| crates/adapters/src/integrated/postgres/output.rs | Implements CDC metadata serialization and validation |
a7459fa to
0a393dd
Compare
Adds a new config `mode` to `PostgresWriterConfig`, which allows us to use a `cdc` mode. It is also possible to specify the operation and timestamp columns, by default, they are: `__feldera_op` and `__feldera_ts`. - `__feldera_op`: either `i` for insert operations, `u` for upserts or `d` for deletes. - `__feldera_ts`: the UTC timestamp when this record is being serialized. For simplicity, in `cdc` mode, all insert, update and delete queries are the same insert query; this allows us to reuse code and avoid refactors. Signed-off-by: Abhinav Gyawali <22275402+abhizer@users.noreply.github.com>
0a393dd to
ea6e831
Compare
Fixes: #5263
Adds a new config
modetoPostgresWriterConfig, which allows us to use acdcmode. It is also possible to specify the operation and timestamp columns, by default, they are:__feldera_opand__feldera_ts.__feldera_op: eitherifor insert operations,ufor upserts ordfor deletes.__feldera_ts: the UTC timestamp when this record is being serialized.For simplicity, in
cdcmode, all insert, update and delete queries are the same insert query; this allows us to reuse code and avoid refactors.