
Chain Time Streaming with Amp, Part 1
Explore the streaming side of Amp, with this post establishing the theory behind it and what it means to process data in chain time including a deep dive into a special case of streaming joins.
This series is for data engineers and analysts building on blockchain data. By the end of this post, you'll understand why blockchains introduce distinct challenges for data stream processing and how Amp addresses them with the concept of chain time.
Introduction
Amp is a blockchain-native DB. Being a database, all data is represented as tables and rows. Being blockchain-native, it must also be a streaming system, ingesting new blocks efficiently and keeping data consistent across forks and chain reorganizations.
This blog series will explore the streaming side of Amp, with this first post establishing the theory behind it and what it means to process data in chain time. We will also dive into the special case of streaming joins.
Streams and timestamps
Blockchains are always mining new blocks, so if you want to analyze blockchain data, you will need a streaming system. Traditional streaming systems have two notions of time:
- Event time, the timestamp at which a source generated an event.
- Processing time, the timestamp at which the system received the event.
Books such as Streaming Systems¹ have been written about this distinction**.** The short story is that users expect their system to respect event time, but due to events arriving late or out of order, processing time often imposes cutoffs or reordering of data. This is a common cause of inconsistency and non-determinism in streaming systems.
Blockchain consensus gives a direct solution to the problem of consistent ordering. There is consensus on a global block order, and within each block on the order of their transactions. Even with a source that gives stronger guarantees than most traditional streaming sources, blockchains introduce their own particularities. They have native notions of time and progress such as block numbers and block hashes, but more critically, the possibility of chain reorgs and forks.
A data streaming system that needs to ingest and keep up with each new block will face problems that event time and processing time alone cannot solve. Enter chain time.
Chain time
A traditional streaming system would have timestamps as its hardcoded representation of time. While blocks may have a timestamp attached to them, they are not sufficient. A blockchain database needs to keep data consistent across forks and reorgs, and therefore block numbers and block hashes are crucial to reason about ordering and forking of the chain.
Amp bridges block numbers and hashes, respectively, to the concepts of watermark and cursor, which are more familiar to a streaming data analyst. Block numbers are how we typically reason about blockchain progress, while block hashes are used to unambiguously address a position in a DAG of forks. Likewise, a watermark is a row-attached measure of progress, while cursors are more granular and complex position markers used in DBs for resuming a long-running query.
So, to follow the transition from blockchain data to a streaming system, we can update our mental models with these analogies. Block numbers are watermarks. Block hashes are cursors.
Streaming query execution
Amp is a SQL database, but it must also behave as a streaming system. Raw blockchain data is represented by tables such as blocks, transactions and logs. Rows are continuously being appended to them. So how will this stream theory be applied?
Let's start with an example query. Say we want to track all outgoing transfer values for a given wallet address, we would query Amp with something like:

First, let's break down what's going on here:
SELECT value— we are selecting thevaluecolumn, which represents the transferred amount in a transaction.FROM "edgeandnode/ethereum_mainnet@0.1.0".transactions— this is the fully qualified table name in Amp. The format isauthor/dataset@version, followed by the table name. Here we are querying thetransactionstable from theethereum_mainnetdataset published byedgeandnode.WHERE sender = x'34fde..'— we filter to only transactions sent from our wallet address of interest. Thex''syntax denotes a hex literal.SETTINGS stream = true— this is what turns the query into a streaming query. Instead of returning a static result set, Amp will continuously emit new rows as new blocks are ingested.
Let's dive into how Amp would process this as a stream. The diagram below tracks how watermarks and cursors would evolve over time in an Amp streaming query or a materialized Amp dataset.

Now we can better visualize how Amp annotates the data flow across chain time. Processing is done in microbatches. In this example, there are 3 microbatches, each microbatch being only 2 blocks long. Within each microbatch, SQL execution actually happens.
Amp applies the SQL statement to each microbatch, projecting columns and filtering rows by the WHERE clause. Amp uses DataFusion, an embedded Rust SQL engine for high-throughput processing. Microbatches are processed sequentially, but within each microbatch execution can be parallelized by DataFusion. In real deployments, a single microbatch can span hundreds of thousands of blocks.
Every row receives a watermark, which here is simply the associated block number. Each microbatch is also associated with a final watermark and cursor. In this way, chain time is threaded from the source raw data through layers of stream processing. The transformed data inherits the watermark and cursor of its source.
The output can be either consumed directly as a streaming query by a client, or written to a derived dataset table. In both cases, the query is materialized incrementally, microbatch by microbatch, rigorously tracking chain time.
Resumable Queries
One powerful feature enabled by chain time cursors is resumable streaming queries. If we wanted to be notified whenever this wallet makes a transfer over a certain threshold amount, we can adapt our original query like this:

In practice, this query will be a long-running connection to Amp, subject to network disconnections and timeouts. To avoid being notified twice by the same transaction across reconnects, we use the cursor. In a streaming gRPC query, Amp exposes the cursor inside the app_metadata field. By storing the last received cursor client-side, the query can be resumed by passing it in the amp-resume request header.
Future parts of this blog series will explore how the high-level APIs of Amp client libraries (Rust, Python, and TypeScript) abstract over cursors for robust stream handling.
Will it stream?
You might have noticed that the query transfer-tracking example given above had a simple SQL structure, using only SELECT projections and WHERE filter clauses. Projections and filters can trivially be applied block-by-block, or even row-by-row, and the end result will be the same as if we had applied it over the entire table all at once.
This property is very convenient for streaming, as we can process a new batch of data in isolation from the historical data. Or when doing historical reprocessing, we can tune the microbatch size to the most efficient value for a workload.
However, not all SQL operations are created equal for streaming. Projections and filters are stateless and compose naturally with microbatch execution. Stateful operations like GROUP BY and JOIN are a different class of problem. Keeping the high-throughput properties of Amp while adding support for this feature is a real technical challenge.
Streaming joins
The essential relational operator in SQL is the join. In the blockchain domain, it appears constantly. Joining transactions to logs, transfers to addresses, contracts to their events, are all essential operations in blockchain data processing. Say we wanted to extend our previous example, from single address tracking to a dynamic list of watchlist addresses:

This simple example posed a crux for the Amp architecture. For non-streaming queries, our DataFusion SQL execution layer can directly process it. But DataFusion is designed for high-throughput columnar execution of batch queries, not for stateful streaming². If we fall back to stateful row-oriented execution, we would lose orders of magnitude in data throughput. So how could we leverage DataFusion’s ability to rip through data while maintaining correct join semantics?
To solve this problem, we needed to go back to research fundamentals. We can more formally state the problem as:
Given two tables T and S, and a pre-computed join output T ⋈ S, we must efficiently compute the join update Δ(T ⋈ S), given ΔT and ΔS.
Here ⋈ denotes the join and Δ denotes the delta, i.e. the set of new or changed rows. Crucially, an inner join is an append-only operation: it only adds rows to its output over time.
This problem is widely studied in database research, particularly in the IVM (incremental view maintenance) literature³. It has been solved under various different frameworks and approaches. Relational algebra particularly can give us the way forward here, through the join update rule. The formula is:
Δ(T ⋈ S) = (ΔT ⋈ S) ∪ (T ⋈ ΔS) ∪ (ΔT ⋈ ΔS)
The following diagram illustrates this more visually.

Relative to the naive approach of recomputing the join from scratch every time, the efficiency gain is clear. All joins involve the delta rows, which will have a smaller cardinality relative to the entire history.
Bringing this back to Amp, we can now compute each of the three terms in the join formula using DataFusion. The deltas can be directly mapped to Amp’s microbatch execution model: Two input microbatches coming in for T and S, to compute an output microbatch for the join.
We can now be confident that our watchlist query above is viable over millions of transactions. Only the new batch is joined against the full history, not the full history against itself.
Conclusion
Blockchains introduce challenges that traditional stream processing systems were not designed to handle. Chain time is the model Amp uses to process blockchain streams reliably.
In the next post, we will look more closely at how chain time worls in practice and explore the special case of streaming joins.
If you are building systems that depend on real-time blockchain data, request a demo to see how chain-time streaming works in practice.
Footnotes
[1] Streaming Systems https://www.oreilly.com/library/view/streaming-systems/9781491983867/
[2] In fairness to DataFusion, it has some support for streaming sources and streaming joins. However that join algorithm requires keeping both sides of the join in memory.
[3] For one reference, see the DBSP paper https://www.vldb.org/pvldb/vol16/p1601-budiu.pdf