Memory experiment #2314
Replies: 11 comments 15 replies
-
|
One should not feed more data in a step than the LATENESS bound. Clearly, this changes the semantics of the program. |
Beta Was this translation helpful? Give feedback.
-
|
I think that the basic conclusion from my findings is that we should keep steps small. |
Beta Was this translation helpful? Give feedback.
-
|
I also think that there's a difference between backfill and real-time operation. Perhaps the connectors should understand this difference. In backfill the buffers full will mostly trigger the processing. |
Beta Was this translation helpful? Give feedback.
-
|
Moreover, I think you also made the point at some other time that input batches need to be sorted. And since sorting is O(n log n), the bigger the batches, the more time spent for sorting. This sorting is often useless, if the data will be indexed again in the pipeline. Perhaps we can make DBSP specify that some batches do not need to be sorted? |
Beta Was this translation helpful? Give feedback.
-
|
This will make a great blog post |
Beta Was this translation helpful? Give feedback.
-
|
And yes, dynamic control + maybe a bit of compiler support will be the way to go |
Beta Was this translation helpful? Give feedback.
-
The pipeline here is in a state of overload, with the arrival rate being higher than the service rate, which means something somewhere has to either a) queue the requests (which means e2e latencies grow unbounded) or b) drop requests (to keep e2e latencies bounded). You can keep step sizes low to bound the per-step latency indeed. That helps with the memory usage around GC/merges etc. That sounds like the correct way to gracefully degrade. But you cannot avoid the buffering before it (either growing sizes for "buffered" or growing lags for data sources like Kafka). |
Beta Was this translation helpful? Give feedback.
-
|
(outside observer here) Sounds like there needs to be a generic backpressure mechanism rather than hardcoded limits / buffers. See https://lucumr.pocoo.org/2020/1/1/async-pressure/ and various posts / papers on the subject. |
Beta Was this translation helpful? Give feedback.
-
|
There are two kinds of streaming sources: hot and cold. The hot ones do not support backpressure Internally we need backpressure, indeed. There's also the question of jitter, the buffers are useful to average throughput of a jittery source. |
Beta Was this translation helpful? Give feedback.
-
|
In this case we are dealing with a cold source, so we should be able to do better. |
Beta Was this translation helpful? Give feedback.
-
|
As a proof, I can reliably crash the Nexmark Q9 pipeline on my laptop with 16 workers and an input batch size of 100K. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
High memory use
Large input batches cause high memory use because:
GC does not work as well because it only takes effect in the next
step. That is, if step 1 raises the waterline to a given level,
data below that level can only be deleted when we get to step 2.
If we divide the same amount of input into 10x as many steps, then
there is much more opportunity for the merger to GC away data in
the process.
We only merge batches of similar size, so if there's one very large
batch we have to wait for another one before doing GC on it.
Merges of very large batches take longer and merging batches of size
A and B takes temporary extra space of up to A+B.
Large input batches take more memory than small ones, both in terms
of the raw transport and the parsed data fed to the circuit.
Large input batches use more memory for intermediate processing
through the pipeline.
Processing a large input batch takes longer than processing a small
one, which gives more time for a large number of records to build up
in the buffers, which in turn causes the next input batch to be a
big one. This also makes it more likely that we'll need to pause
the transport (which Kafka handles badly so that we prefer to just
wait and let even more data build up in the buffers in the hope that
we can clear the backlog).
Demonstration
Consider our customer demo. On my system:
With 16 workers, it peaks with about 6 GiB RAM.
With 8 workers, it peaks with about 35 GiB RAM.
The main difference is that 16 workers can process events as fast as
they arrive, which keeps input batches small. With 8 workers, they
pile up as described above.
We can visualize the behavior of each one. The following graph shows
the behavior with 16 workers. The buffered records stay near-zero the
whole time, the input records increases linearly and the processed
records follow closely behind. Memory initially increases
superlinearly but merging catches up and reduces it (one can see two
distinct inflection points):
The graph for 8 workers is very different. Input records arrive at
the same linear rate, but since processing cannot keep up, buffered
records increase during each step to a new higher peak (it never
reaches the default
max_buffered_recordsof 1,000,000 only becausethis data set has about 175,000 records). Thus, steps get bigger and
bigger and slower and slower. Memory also increases steadily, partly
because the merger can only increase memory use during a step, and the
few steps mean that there are few opportunities. The final step is
smaller because input is exhausted, and by then some memory can be
released by the merger as well.
Analysis
We can separate the memory use into different effects:
Eliminate effects from Kafka buffering, by using the file connector
instead.
Separate memory used for intermediate processing from memory for
buffering input, by running them at mutually exclusive times:
Pausing the connectors when a step is running. This requires a
patch.
Start a step only when the buffers have accumulated a specific
number of records. For example, with
min_batch_size_recordsof50,000 and
max_buffering_delay_usecsof 10,000,000, the pipelinewill wait up to 10 seconds to buffer 50,000 records, and since it
takes less than 10 seconds to do that, each step will have
(approximately) 50,000 records.
Eliminate effects from background merging, by merging eagerly and
completely into a single batch whenever a new batch is inserted into
a spine. Similarly, whenever the spine's filters change (which are
how GC is implemented), the merger immediately frees all the records
that are no longer needed. This requires a patch to add an "eager
merger".
Show the GC cost of large batches by making the final batch read
from the file very small, so that memory use drops when GC frees
everything up to the near-final waterline. This requires a patch,
too.
Run without storage, so that memory allocated for the storage cache
does not muddy the problem.
If we run the same demo with these changes, we get the following
behavior (graphed below):
Buffered records build up in each step until 50,000 accumulate,
except for the second-to-last step where there are only about
25,000, and the last step (not visible due to resolution) where
there is only 1.
Input records increase at the same rate as buffered records (but
without the drops).
While records build up in the buffers, parsing takes place but no
other processing. Memory increases steadily but relatively slowly
in exact lockstep.
When 50,000 records accumulate or the 10-second timer expires (for
the last two steps), buffering pauses and a step begins. There are
only 5 steps:
During the first step (6-13 seconds), memory use shoots up very
quickly, dropping slightly at the end of the step.
The second step (21-29 seconds) also increases memory consumption
from start to finish, peaking higher than it finishes.
The third step (37-44 seconds) peaks still higher but finishes
just above the second step's final memory consumption, which
suggests that the steady-state memory consumption for
50,000-record steps in this demo is about 10 GB.
The fourth step (55-60 seconds) is about half the size of the
earlier ones. Memory consumption drops sharply because GC
eliminated just as much data as in steps 2 and 3 and only half as
much new data was added.
The fifth step (69-70 seconds) is a single record. Memory
consumption drops sharply again, about as much as in the fourth
step, because GC eliminated the data introduced in step 4 without
adding a significant new amount.
The memory usage at the end of each step is the minimum possible
given the amount of GC allowed, since the merger eagerly merges and
discards all GCable data. At the end of the first step, no GC at
all is allowed.
Beta Was this translation helpful? Give feedback.
All reactions