-
Notifications
You must be signed in to change notification settings - Fork 89
pipeline-manager: track pipeline lifecycle history #4593
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
crates/pipeline-manager/migrations/V27__add_pipeline_lifecycle_history.sql
Outdated
Show resolved
Hide resolved
crates/pipeline-manager/migrations/V27__add_pipeline_lifecycle_history.sql
Outdated
Show resolved
Hide resolved
crates/pipeline-manager/migrations/V27__add_pipeline_lifecycle_history.sql
Outdated
Show resolved
Hide resolved
crates/pipeline-manager/migrations/V27__add_pipeline_lifecycle_history.sql
Outdated
Show resolved
Hide resolved
crates/pipeline-manager/migrations/V27__add_pipeline_lifecycle_history.sql
Outdated
Show resolved
Hide resolved
|
Looks good, it has all the components, just needs to be made slightly more general and a couple of fixes to fit into the current database implementation architecture. |
|
@swanandx , how do you anticipate pipeline crash and recovery to be represented in this new table? |
wdym by crash and recovery? if pipeline crashes, it gets stopped right? when does it recover? when transitioning to stopping, runner tracks an error message ( reason for stopping ), which we would write to db to know why it was stopped / crashed. we can see the events like:
there is added panic in udf example |
When the pipeline runs with fault tolerance, it won't transition into Stopped state on a failure. It may or may not become unavailable for some time, and eventually k8s will restart it in a new pod, where the pipeline will resume from a checkpoint. I think there are exceptions to this, e.g., the pipeline won't restart in case of an OOM error. @snkas can confirm the exact behavior. While this is meant to largely mask the failure from the user, it's important to notify them about the failure, providing as much information as possible about the cause. |
how do we get that info in runner as of now ( if we get them at all ) ? |
I'm not sure we do, I was hoping we could explore this as part of working on this issue. This is the main reason we started this in the first place, to log pipeline failures and restarts in a useful way. |
|
I would recommend keeping the database table and its associated functions quite generic, then we can later on decide further what constitutes an event. It might be best to keep it as generic as ids + timestamp + free text + maybe deployment status at the time. |
I agree with this to keep it freestyle for the most part. |
b9a8295 to
9d6af95
Compare
crates/pipeline-manager/migrations/V27__add_pipeline_lifecycle_history.sql
Outdated
Show resolved
Hide resolved
crates/pipeline-manager/migrations/V27__add_pipeline_lifecycle_history.sql
Outdated
Show resolved
Hide resolved
c6467f5 to
221ace3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements pipeline lifecycle event tracking functionality in the pipeline manager. It adds a new database table to store historical events for pipeline deployment status changes and provides an API endpoint to retrieve these events.
- Adds a new
pipeline_lifecycle_eventstable to track deployment status changes - Implements API endpoint
/v0/pipelines/{pipeline_name}/lifecycle_eventsto query events - Includes automatic event recording when pipeline deployment status transitions occur
Reviewed Changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| openapi.json | Adds OpenAPI specification for the new lifecycle events endpoint |
| crates/pipeline-manager/src/db/test.rs | Comprehensive test coverage for lifecycle events functionality and model implementation |
| crates/pipeline-manager/src/db/storage_postgres.rs | PostgreSQL implementation with cleanup functionality for lifecycle events |
| crates/pipeline-manager/src/db/storage.rs | Storage trait definition for lifecycle events interface |
| crates/pipeline-manager/src/db/operations/pipeline.rs | Database operations for storing and retrieving lifecycle events |
| crates/pipeline-manager/src/api/mod.rs | Module registration for lifecycle events API |
| crates/pipeline-manager/src/api/main.rs | API endpoint registration and schema definitions |
| crates/pipeline-manager/src/api/lifecycle_events.rs | Data structure definition for lifecycle events |
| crates/pipeline-manager/src/api/endpoints/pipeline_interaction/mod.rs | Module organization for lifecycle events endpoint |
| crates/pipeline-manager/src/api/endpoints/pipeline_interaction/lifecycle_events.rs | REST API implementation for lifecycle events endpoint |
| crates/pipeline-manager/proptest-regressions/db/test.txt | Property test regression for lifecycle events |
| crates/pipeline-manager/migrations/V27__add_pipeline_lifecycle_history.sql | Database migration to create lifecycle events table |
snkas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, couple more comments, but everything is there now. Great job on adding it to the database tests, one more addition needed there. I'll try it out locally afterward for final review, likely with some minor spelling and log comments.
674456f to
86debe7
Compare
Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
snkas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adjusting it after the status rework, looks good, I made most minor comments, though it would be good to bring storage and program status also into the fold.
| event_id uuid PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| pipeline_id uuid NOT NULL, | ||
| tenant_id uuid NOT NULL, | ||
| deployment_resources_status varchar NOT NULL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also: deployment_resources_desired_status
| .await?; | ||
|
|
||
| Ok(HttpResponse::Ok() | ||
| .content_type("application/json") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think .json() already sets the content type by itself.
| /// Query parameters for lifecycle events endpoint. | ||
| #[derive(serde::Deserialize)] | ||
| pub struct EventsParameters { | ||
| pub max_events: u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default? I think last 20 makes sense.
| /// | ||
| /// # Parameters | ||
| /// - `pipeline_name`: Unique pipeline name (path parameter) | ||
| /// - `max_events`: Maximum number of events to return (query parameter, required) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mention default.
| /// # Returns | ||
| /// - 200 OK: List of lifecycle events for the pipeline | ||
| /// - 404 NOT_FOUND: Pipeline with that name does not exist | ||
| /// - 503 SERVICE_UNAVAILABLE: Disconnected or timeout during response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this specific 503 can happen, nothing is being HTTP interacted with?
| .is_ok_and(|e| e.is_empty())); | ||
|
|
||
| // "Compile" the program of pipeline1 | ||
| assert_eq!( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think compilation and storage status are also interesting, as they are part of the pipeline lifecycle: https://docs.feldera.com/pipelines/lifecycle . Adding program_status and storage_status I think is worthwhile.
| let model_response = model.get_pipeline_lifecycle_events(tenant_id, &pipeline_name, max_events).await; | ||
| let impl_response = handle.db.get_pipeline_lifecycle_events(tenant_id, &pipeline_name, max_events).await; | ||
|
|
||
| // remove event_id, recorded_at as they can't be same in db and model |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make a custom check_responses that does this comparison, that would be in line with how it is handled by the others where for instance timestamp is an issue.
| GetPipelineLifecycleEvents( | ||
| TenantId, | ||
| #[proptest(strategy = "limited_pipeline_name()")] String, | ||
| u32, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having a strategy to provide here possible values, especially 0, would be useful.
| } | ||
|
|
||
| // calls cleanup_pipeline_lifecycle_events every interval | ||
| pub async fn regular_cleanup_pipeline_lifecycle_events( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be moved into src/ in its own file, to not confuse it with a storage operation.
| tenant_id uuid NOT NULL, | ||
| deployment_resources_status varchar NOT NULL, | ||
| deployment_runtime_status varchar, | ||
| deployment_runtime_desired_status varchar, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
storage_status
program_status
|
@snkas Does this need a round of review now you're back? |
|
It needs to be adjusted for post-pipeline-rework Feldera. I'll coordinate the work with @swanandx . My envisioned goal is that we use the same strategy for cluster status history and pipeline status history. |


Fixes #4592
Checklist
Breaking Changes?
Mark if you think the answer is yes for any of these components:
creates new table in postgres, so should be considered breaking
Describe Incompatible Changes
added new migration and updated logic when setting deployment status.