Skip to content

Conversation

@swanandx
Copy link
Contributor

Fixes #4592

Checklist

  • Documentation updated
  • Changelog updated

Breaking Changes?

Mark if you think the answer is yes for any of these components:

  • Others (specify)

creates new table in postgres, so should be considered breaking

Describe Incompatible Changes

added new migration and updated logic when setting deployment status.

@snkas
Copy link
Contributor

snkas commented Aug 18, 2025

Looks good, it has all the components, just needs to be made slightly more general and a couple of fixes to fit into the current database implementation architecture.

@ryzhyk
Copy link
Contributor

ryzhyk commented Aug 18, 2025

@swanandx , how do you anticipate pipeline crash and recovery to be represented in this new table?

@swanandx
Copy link
Contributor Author

swanandx commented Aug 19, 2025

@swanandx , how do you anticipate pipeline crash and recovery to be represented in this new table?

wdym by crash and recovery? if pipeline crashes, it gets stopped right? when does it recover?

when transitioning to stopping, runner tracks an error message ( reason for stopping ), which we would write to db to know why it was stopped / crashed.

we can see the events like:

Screenshot 2025-08-19 at 4 31 03 PM

there is info column, which records those errors / additional info in future. for example, the info for stopping and stopped deployment status would be:

added panic in udf example
 {"message":"DBSP error: runtime error: One or more worker threads terminated unexpectedly\nforeground worker thread 2 panicked\npanic message: never gonna give you up\npanic location: crates/feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals/src/udf.rs:8:5\nstack trace:\n   0: std::backtrace::Backtrace::create\n   1: dbsp::circuit::runtime::Runtime::panic\n   2: dbsp::circuit::runtime::panic_hook\n   3: std::panicking::rust_panic_with_hook\n   4: std::panicking::begin_panic_handler::{{closure}}\n   5: std::sys::backtrace::__rust_end_short_backtrace\n   6: __rustc::rust_begin_unwind\n   7: core::panicking::panic_fmt\n   8: feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals::udf::base64\n   9: feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals::stubs::base64\n  10: dbsp::mono::<impl dbsp::circuit::circuit_builder::Stream<dbsp::circuit::circuit_builder::ChildCircuit<()>,dbsp::typed_batch::TypedBatch<K,(),i64,dbsp::trace::ord::fallback::wset::FallbackWSet<dyn dbsp::dynamic::data::Data,dyn dbsp::dynamic::weight::WeightTyped+Type = i64>>>>::map::{{closure}}\n  11: <minitrace::future::InSpan<T> as core::future::future::Future>::poll\n  12: <dbsp::operator::dynamic::filter_map::MapZSet<CI,CO> as dbsp::circuit::operator_traits::UnaryOperator<CI,CO>>::eval::{{closure}}\n  13: dbsp::circuit::operator_traits::UnaryOperator::eval_owned::{{closure}}\n  14: <dbsp::circuit::circuit_builder::UnaryNode<C,I,O,Op> as dbsp::circuit::circuit_builder::Node>::eval::{{closure}}\n  15: dbsp::circuit::schedule::dynamic_scheduler::Inner::spawn_task::{{closure}}\n  16: tokio::runtime::task::core::Core<T,S>::poll\n  17: tokio::runtime::task::harness::Harness<T,S>::poll\n  18: tokio::task::local::LocalSet::tick\n  19: std::thread::local::LocalKey<T>::with\n  20: dbsp::circuit::circuit_builder::CircuitHandle::step::{{closure}}\n  21: tokio::runtime::scheduler::current_thread::CurrentThread::block_on\n  22: tokio::runtime::runtime::Runtime::block_on\n  23: dbsp::circuit::circuit_builder::CircuitHandle::step\n  24: std::sys::backtrace::__rust_begin_short_backtrace\n  25: core::ops::function::FnOnce::call_once{{vtable.shim}}\n  26: std::sys::pal::unix::thread::Thread::new::thread_start\n  27: __pthread_cond_wait\n\n\nforeground worker thread 6 panicked\npanic message: never gonna give you up\npanic location: crates/feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals/src/udf.rs:8:5\nstack trace:\n   0: std::backtrace::Backtrace::create\n   1: dbsp::circuit::runtime::Runtime::panic\n   2: dbsp::circuit::runtime::panic_hook\n   3: std::panicking::rust_panic_with_hook\n   4: std::panicking::begin_panic_handler::{{closure}}\n   5: std::sys::backtrace::__rust_end_short_backtrace\n   6: __rustc::rust_begin_unwind\n   7: core::panicking::panic_fmt\n   8: feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals::udf::base64\n   9: feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals::stubs::base64\n  10: dbsp::mono::<impl dbsp::circuit::circuit_builder::Stream<dbsp::circuit::circuit_builder::ChildCircuit<()>,dbsp::typed_batch::TypedBatch<K,(),i64,dbsp::trace::ord::fallback::wset::FallbackWSet<dyn dbsp::dynamic::data::Data,dyn dbsp::dynamic::weight::WeightTyped+Type = i64>>>>::map::{{closure}}\n  11: <minitrace::future::InSpan<T> as core::future::future::Future>::poll\n  12: <dbsp::operator::dynamic::filter_map::MapZSet<CI,CO> as dbsp::circuit::operator_traits::UnaryOperator<CI,CO>>::eval::{{closure}}\n  13: dbsp::circuit::operator_traits::UnaryOperator::eval_owned::{{closure}}\n  14: <dbsp::circuit::circuit_builder::UnaryNode<C,I,O,Op> as dbsp::circuit::circuit_builder::Node>::eval::{{closure}}\n  15: dbsp::circuit::schedule::dynamic_scheduler::Inner::spawn_task::{{closure}}\n  16: tokio::runtime::task::core::Core<T,S>::poll\n  17: tokio::runtime::task::harness::Harness<T,S>::poll\n  18: tokio::task::local::LocalSet::tick\n  19: std::thread::local::LocalKey<T>::with\n  20: dbsp::circuit::circuit_builder::CircuitHandle::step::{{closure}}\n  21: tokio::runtime::scheduler::current_thread::CurrentThread::block_on\n  22: tokio::runtime::runtime::Runtime::block_on\n  23: dbsp::circuit::circuit_builder::CircuitHandle::step\n  24: std::sys::backtrace::__rust_begin_short_backtrace\n  25: core::ops::function::FnOnce::call_once{{vtable.shim}}\n  26: std::sys::pal::unix::thread::Thread::new::thread_start\n  27: __pthread_cond_wait\n\n\n","error_code":"RuntimeError.WorkerPanic","details":{"error":{"WorkerPanic":{"panic_info":[[2,"foreground",{"backtrace":"   0: std::backtrace::Backtrace::create\n   1: dbsp::circuit::runtime::Runtime::panic\n   2: dbsp::circuit::runtime::panic_hook\n   3: std::panicking::rust_panic_with_hook\n   4: std::panicking::begin_panic_handler::{{closure}}\n   5: std::sys::backtrace::__rust_end_short_backtrace\n   6: __rustc::rust_begin_unwind\n   7: core::panicking::panic_fmt\n   8: feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals::udf::base64\n   9: feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals::stubs::base64\n  10: dbsp::mono::<impl dbsp::circuit::circuit_builder::Stream<dbsp::circuit::circuit_builder::ChildCircuit<()>,dbsp::typed_batch::TypedBatch<K,(),i64,dbsp::trace::ord::fallback::wset::FallbackWSet<dyn dbsp::dynamic::data::Data,dyn dbsp::dynamic::weight::WeightTyped+Type = i64>>>>::map::{{closure}}\n  11: <minitrace::future::InSpan<T> as core::future::future::Future>::poll\n  12: <dbsp::operator::dynamic::filter_map::MapZSet<CI,CO> as dbsp::circuit::operator_traits::UnaryOperator<CI,CO>>::eval::{{closure}}\n  13: dbsp::circuit::operator_traits::UnaryOperator::eval_owned::{{closure}}\n  14: <dbsp::circuit::circuit_builder::UnaryNode<C,I,O,Op> as dbsp::circuit::circuit_builder::Node>::eval::{{closure}}\n  15: dbsp::circuit::schedule::dynamic_scheduler::Inner::spawn_task::{{closure}}\n  16: tokio::runtime::task::core::Core<T,S>::poll\n  17: tokio::runtime::task::harness::Harness<T,S>::poll\n  18: tokio::task::local::LocalSet::tick\n  19: std::thread::local::LocalKey<T>::with\n  20: dbsp::circuit::circuit_builder::CircuitHandle::step::{{closure}}\n  21: tokio::runtime::scheduler::current_thread::CurrentThread::block_on\n  22: tokio::runtime::runtime::Runtime::block_on\n  23: dbsp::circuit::circuit_builder::CircuitHandle::step\n  24: std::sys::backtrace::__rust_begin_short_backtrace\n  25: core::ops::function::FnOnce::call_once{{vtable.shim}}\n  26: std::sys::pal::unix::thread::Thread::new::thread_start\n  27: __pthread_cond_wait\n","location":{"col":5,"file":"crates/feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals/src/udf.rs","line":8},"message":"never gonna give you up"}],[6,"foreground",{"backtrace":"   0: std::backtrace::Backtrace::create\n   1: dbsp::circuit::runtime::Runtime::panic\n   2: dbsp::circuit::runtime::panic_hook\n   3: std::panicking::rust_panic_with_hook\n   4: std::panicking::begin_panic_handler::{{closure}}\n   5: std::sys::backtrace::__rust_end_short_backtrace\n   6: __rustc::rust_begin_unwind\n   7: core::panicking::panic_fmt\n   8: feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals::udf::base64\n   9: feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals::stubs::base64\n  10: dbsp::mono::<impl dbsp::circuit::circuit_builder::Stream<dbsp::circuit::circuit_builder::ChildCircuit<()>,dbsp::typed_batch::TypedBatch<K,(),i64,dbsp::trace::ord::fallback::wset::FallbackWSet<dyn dbsp::dynamic::data::Data,dyn dbsp::dynamic::weight::WeightTyped+Type = i64>>>>::map::{{closure}}\n  11: <minitrace::future::InSpan<T> as core::future::future::Future>::poll\n  12: <dbsp::operator::dynamic::filter_map::MapZSet<CI,CO> as dbsp::circuit::operator_traits::UnaryOperator<CI,CO>>::eval::{{closure}}\n  13: dbsp::circuit::operator_traits::UnaryOperator::eval_owned::{{closure}}\n  14: <dbsp::circuit::circuit_builder::UnaryNode<C,I,O,Op> as dbsp::circuit::circuit_builder::Node>::eval::{{closure}}\n  15: dbsp::circuit::schedule::dynamic_scheduler::Inner::spawn_task::{{closure}}\n  16: tokio::runtime::task::core::Core<T,S>::poll\n  17: tokio::runtime::task::harness::Harness<T,S>::poll\n  18: tokio::task::local::LocalSet::tick\n  19: std::thread::local::LocalKey<T>::with\n  20: dbsp::circuit::circuit_builder::CircuitHandle::step::{{closure}}\n  21: tokio::runtime::scheduler::current_thread::CurrentThread::block_on\n  22: tokio::runtime::runtime::Runtime::block_on\n  23: dbsp::circuit::circuit_builder::CircuitHandle::step\n  24: std::sys::backtrace::__rust_begin_short_backtrace\n  25: core::ops::function::FnOnce::call_once{{vtable.shim}}\n  26: std::sys::pal::unix::thread::Thread::new::thread_start\n  27: __pthread_cond_wait\n","location":{"col":5,"file":"crates/feldera_pipe_pipeline_0198c1f9_d4d4_72c3_a7c7_cdc6cae9ae09_globals/src/udf.rs","line":8},"message":"never gonna give you up"}]]}}}}

@ryzhyk
Copy link
Contributor

ryzhyk commented Aug 19, 2025

@swanandx , how do you anticipate pipeline crash and recovery to be represented in this new table?

wdym by crash and recovery? if pipeline crashes, it gets stopped right? when does it recover?

when transitioning to stopping, runner tracks an error message ( reason for stopping ), which we would write to db to know why it was stopped / crashed.

we can see the events like:
Screenshot 2025-08-19 at 4 31 03 PM

there is info column, which records those errors / additional info in future. for example, the info for stopping and stopped deployment status would be:
added panic in udf example

When the pipeline runs with fault tolerance, it won't transition into Stopped state on a failure. It may or may not become unavailable for some time, and eventually k8s will restart it in a new pod, where the pipeline will resume from a checkpoint. I think there are exceptions to this, e.g., the pipeline won't restart in case of an OOM error. @snkas can confirm the exact behavior.

While this is meant to largely mask the failure from the user, it's important to notify them about the failure, providing as much information as possible about the cause.

@swanandx
Copy link
Contributor Author

While this is meant to largely mask the failure from the user, it's important to notify them about the failure, providing as much information as possible about the cause.

how do we get that info in runner as of now ( if we get them at all ) ?

@ryzhyk
Copy link
Contributor

ryzhyk commented Aug 19, 2025

While this is meant to largely mask the failure from the user, it's important to notify them about the failure, providing as much information as possible about the cause.

how do we get that info in runner as of now ( if we get them at all ) ?

I'm not sure we do, I was hoping we could explore this as part of working on this issue. This is the main reason we started this in the first place, to log pipeline failures and restarts in a useful way.

@snkas
Copy link
Contributor

snkas commented Aug 19, 2025

I would recommend keeping the database table and its associated functions quite generic, then we can later on decide further what constitutes an event. It might be best to keep it as generic as ids + timestamp + free text + maybe deployment status at the time.

@lalithsuresh
Copy link
Contributor

I would recommend keeping the database table and its associated functions quite generic, then we can later on decide further what constitutes an event. It might be best to keep it as generic as ids + timestamp + free text + maybe deployment status at the time.

I agree with this to keep it freestyle for the most part.

@swanandx swanandx force-pushed the issue4592 branch 2 times, most recently from b9a8295 to 9d6af95 Compare August 20, 2025 11:13
@swanandx swanandx force-pushed the issue4592 branch 4 times, most recently from c6467f5 to 221ace3 Compare August 25, 2025 10:02
@swanandx swanandx marked this pull request as ready for review August 25, 2025 10:02
Copilot AI review requested due to automatic review settings August 25, 2025 10:02
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements pipeline lifecycle event tracking functionality in the pipeline manager. It adds a new database table to store historical events for pipeline deployment status changes and provides an API endpoint to retrieve these events.

  • Adds a new pipeline_lifecycle_events table to track deployment status changes
  • Implements API endpoint /v0/pipelines/{pipeline_name}/lifecycle_events to query events
  • Includes automatic event recording when pipeline deployment status transitions occur

Reviewed Changes

Copilot reviewed 12 out of 13 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
openapi.json Adds OpenAPI specification for the new lifecycle events endpoint
crates/pipeline-manager/src/db/test.rs Comprehensive test coverage for lifecycle events functionality and model implementation
crates/pipeline-manager/src/db/storage_postgres.rs PostgreSQL implementation with cleanup functionality for lifecycle events
crates/pipeline-manager/src/db/storage.rs Storage trait definition for lifecycle events interface
crates/pipeline-manager/src/db/operations/pipeline.rs Database operations for storing and retrieving lifecycle events
crates/pipeline-manager/src/api/mod.rs Module registration for lifecycle events API
crates/pipeline-manager/src/api/main.rs API endpoint registration and schema definitions
crates/pipeline-manager/src/api/lifecycle_events.rs Data structure definition for lifecycle events
crates/pipeline-manager/src/api/endpoints/pipeline_interaction/mod.rs Module organization for lifecycle events endpoint
crates/pipeline-manager/src/api/endpoints/pipeline_interaction/lifecycle_events.rs REST API implementation for lifecycle events endpoint
crates/pipeline-manager/proptest-regressions/db/test.txt Property test regression for lifecycle events
crates/pipeline-manager/migrations/V27__add_pipeline_lifecycle_history.sql Database migration to create lifecycle events table

Copy link
Contributor

@snkas snkas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, couple more comments, but everything is there now. Great job on adding it to the database tests, one more addition needed there. I'll try it out locally afterward for final review, likely with some minor spelling and log comments.

@swanandx swanandx force-pushed the issue4592 branch 4 times, most recently from 674456f to 86debe7 Compare August 28, 2025 11:56
@swanandx swanandx requested a review from snkas August 28, 2025 11:56
Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
Signed-off-by: Swanand Mulay <73115739+swanandx@users.noreply.github.com>
Signed-off-by: feldera-bot <feldera-bot@feldera.com>
Copy link
Contributor

@snkas snkas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adjusting it after the status rework, looks good, I made most minor comments, though it would be good to bring storage and program status also into the fold.

event_id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
pipeline_id uuid NOT NULL,
tenant_id uuid NOT NULL,
deployment_resources_status varchar NOT NULL,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: deployment_resources_desired_status

.await?;

Ok(HttpResponse::Ok()
.content_type("application/json")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think .json() already sets the content type by itself.

/// Query parameters for lifecycle events endpoint.
#[derive(serde::Deserialize)]
pub struct EventsParameters {
pub max_events: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default? I think last 20 makes sense.

///
/// # Parameters
/// - `pipeline_name`: Unique pipeline name (path parameter)
/// - `max_events`: Maximum number of events to return (query parameter, required)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention default.

/// # Returns
/// - 200 OK: List of lifecycle events for the pipeline
/// - 404 NOT_FOUND: Pipeline with that name does not exist
/// - 503 SERVICE_UNAVAILABLE: Disconnected or timeout during response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this specific 503 can happen, nothing is being HTTP interacted with?

.is_ok_and(|e| e.is_empty()));

// "Compile" the program of pipeline1
assert_eq!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think compilation and storage status are also interesting, as they are part of the pipeline lifecycle: https://docs.feldera.com/pipelines/lifecycle . Adding program_status and storage_status I think is worthwhile.

let model_response = model.get_pipeline_lifecycle_events(tenant_id, &pipeline_name, max_events).await;
let impl_response = handle.db.get_pipeline_lifecycle_events(tenant_id, &pipeline_name, max_events).await;

// remove event_id, recorded_at as they can't be same in db and model
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make a custom check_responses that does this comparison, that would be in line with how it is handled by the others where for instance timestamp is an issue.

GetPipelineLifecycleEvents(
TenantId,
#[proptest(strategy = "limited_pipeline_name()")] String,
u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having a strategy to provide here possible values, especially 0, would be useful.

}

// calls cleanup_pipeline_lifecycle_events every interval
pub async fn regular_cleanup_pipeline_lifecycle_events(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be moved into src/ in its own file, to not confuse it with a storage operation.

tenant_id uuid NOT NULL,
deployment_resources_status varchar NOT NULL,
deployment_runtime_status varchar,
deployment_runtime_desired_status varchar,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storage_status
program_status

@blp
Copy link
Member

blp commented Nov 14, 2025

@snkas Does this need a round of review now you're back?

@snkas
Copy link
Contributor

snkas commented Nov 14, 2025

It needs to be adjusted for post-pipeline-rework Feldera.

I'll coordinate the work with @swanandx .

My envisioned goal is that we use the same strategy for cluster status history and pipeline status history.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[pipeline-manager] track pipeline state transitions in postgres

7 participants