Atlas Stream Processing windows are aggregation pipeline stages that capture time-bounded subsets of a data stream, allowing you to perform operations that require finite inputs on streaming data.
Consider the example stream processor described here. The
$match
stage can operate directly on the stream of data
pulled in by $source
, checking each document against the
match criteria as the stream processor ingests it.
By contrast, the $group
stage and the various statistical
computations contained within it cannot operate on unbounded data, as
it is impossible to determine minimum, maximum, average, or median
values without first bounding the set of values to consider. Many
non-mathematical operators such as $push and $top also require bounded data.
A stream processor provides these bounds with a window. A window opens, and all documents that the stream processor ingests accumulate in that window's state until a predefined interval of time elapses and the window closes. The window batches all documents it captures during that interval, and passes this set through its internal pipeline. From within this pipeline, the batched documents are indistinguishable from data-at-rest.
Atlas Stream Processing provides support for Tumbling Windows, Hopping Windows, and Session Windows.
Tumbling Windows
Tumbling windows are windows defined entirely by the time intervals they capture. These time intervals don't overlap.
Example
You define a tumbling window with an interval of 3 seconds. When you start your stream processor:
A window opens for 3 seconds.
The first window captures all documents that the stream generates within those 3 seconds.
After 3 seconds elapse, the window closes and applies your aggregation logic to all the documents in that window.
If you configure
allowedLateness
, Atlas Stream Processing writes late-arriving messages to the Dead Letter Queue after the window closes.A new window opens as soon as the first one closes and captures documents from the stream for the next 3 seconds.
Tumbling windows ensure comprehensive capture of data streams without repeated processing of individual documents.
Hopping Windows
Hopping windows are windows defined by the time interval they capture and the interval between opening each window, called the hop. Since duration is decoupled from frequency, you can configure hopping windows to overlap them or to space them apart.
To define a hopping window with overlap, set a hop smaller than the interval.
Example
You define a hopping window with an interval of 20 seconds and a hop of 5 seconds. When you start your stream processor:
A window opens for 20 seconds.
The first window captures all documents that the stream generates within those 20 seconds.
5 seconds later, another window opens and captures all documents within the next 20 seconds. Because the first window is still open, all documents that the stream generates for the next 15 seconds are captured by both windows.
20 seconds after the first window opens, it closes and applies your aggregation logic to all the documents in that window.
5 seconds later, the second window closes and applies your aggregation logic to all the documents in that window, including those that were already subject to aggregation logic in the first window.
If you configure allowedLateness
, Atlas Stream Processing writes
late-arriving messages to the Dead Letter Queue after the window
closes.
To define a hopping window with spacing, set a hop larger than the interval.
Example
You define a hopping window with an interval of 3 seconds and a hop of 5 seconds. When you start a stream processor:
A window opens for 3 seconds.
The first window captures all documents for the next 3 seconds.
After 3 seconds elapse, the window closes and applies your aggregation logic to all the documents in that window.
The next window opens after a further 2 seconds elapse.
Atlas Stream Processing does not process any documents that the stream generates during those 2 seconds.
Session Windows
Session windows are windows that allow you to run a pipeline on each "session" of activity in an input stream. Two documents are in the same session if they have the same partition, and the difference of their timestamps is less than the session gap.
When a window is closed, its results are released to the next stage.
Example
You define a partition of $userId
, a gap of 5 minutes,
and an allowed lateness of 5 seconds.
When you start your stream processor:
A window opens when the first document reaches the
$sessionWindow
.Any documents that reach the stage that contain the same
userId
value (including an absence of this field) within five minutes and five seconds of the most recent matching document arriving at the processor are added to same session window.The window remains open until no documents are added to it for the gap of five minutes plus the allowed lateness of five seconds.