Filesystem store
Make sure you set the options described in the Filesystem Syncs and Dataset Partitioning section. Failure to do so may cause the processor to output an empty dataset.
Overview
Squid SDK provides append-only Store
interface implementations for saving the data generated by squids to files. The store is designed primarily for offline analytics. It supports CSV and Parquet files, persisted either locally or to a S3 storage. See the list of Packages below.
File-based stores always partition data sets along the "block height" dimension, even when it is not in the schema. The number of blocks per partition is variable. It is determined by an algorithm that balances the lag of the indexed data against the number of partitions produced by the store. Make sure to configure the algorithm so that the tradeoff is appropriate for your use case.
Same failover guarantees as with the Postgres-based store are provided: the processor will roll back to the last successful state after a restart.
The core component of all file-based stores is the Database
class from @subsquid/file-store
. To use it, construct an instance and pass it to processor.run()
. This results in ctx.store
exposing table writers that accept rows of data to be stored.
Example
Save ERC20 Transfer
events retrieved by EVM processor in transfers.csv
files:
import {EvmBatchProcessor} from '@subsquid/evm-processor'
import * as erc20abi from './abi/erc20'
import {Database, LocalDest} from '@subsquid/file-store'
import {Column, Table, Types} from '@subsquid/file-store-csv'
const processor = /* processor definition */
const dbOptions = {
tables: {
TransfersTable: new Table('transfers.csv', {
from: Column(Types.String()),
to: Column(Types.String()),
value: Column(Types.Numeric())
})
},
dest: new LocalDest('./data'),
chunkSizeMb: 10
}
processor.run(new Database(dbOptions), async (ctx) => {
for (let c of ctx.blocks) {
for (let log of c.logs) {
if (/* the log item is a Transfer we're interested in */) {
let { from, to, value } =
erc20abi.events.Transfer.decode(log)
ctx.store.TransfersTable.write({ from, to, value })
}
}
}
})
The resulting ./data
folder may look like this:
./data/
├── 0000000000-0007688959
│ └── transfers.csv
├── 0007688960-0007861589
│ └── transfers.csv
...
├── 0016753040-0016762029
│ └── transfers.csv
└── status.txt
Each of the folders here contains a little over 10 MBytes of data. status.txt
contains the height of the last indexed block and its hash.
Packages
@subsquid/file-store
is the core package that contains the implementation of Database
for filesystems. At least one file format addon must be installed alongside it:
- CSV: Supported via
@subsquid/file-store-csv
. - Parquet: An advanced format that works well for larger data sets. Supported via
@subsquid/file-store-parquet
.
Data in either of these formats can be written to
- A local filesystem: Supported by
@subsquid/file-store
out of the box. - A bucket in an Amazon S3-compatible cloud: Supported via
@subsquid/file-store-s3
.
Database
Options
Constructor of the Database
class from file-store
accepts a configuration object as its only argument. Its format is as follows:
DatabaseOptions {
tables: Record<string, Table>
dest: Dest
chunkSizeMb?: number
syncIntervalBlocks?: number
hooks?: DatabaseHooks<Dest>
}
Here,
Table
is an interface for classes that make table writers, objects that convert in-memory tabular data into format-specific file contents. An implementation ofTable
is available for every file format supported byfile-store
. Consult pages about specific output formats to find out how to defineTable
s.tables
is a mapping from developer-defined string handles toTable
instances. A table writer will be created for eachTable
in this mapping. It will be exposed atctx.store.<tableHandle>
.dest
is an instance ofDest
, an interface for objects that take the properly formatted file contents and write them onto a particular filesystem. An implementation ofDest
is available for every filesystem supported byfile-store
. For local filesystems use theLocalDest
class from the@subsquid/file-store
package and supplynew LocalDest(outputDirectoryName)
here. For other targets consult documentation pages specific to your filesystem choice.chunkSizeMb
,syncIntervalBlocks
andhooks
are optional parameters that tune the behavior of the dataset partitioning algorithm.
Table Writer Interface
For each Table
supplied via the tables
field of the constructor argument, Database
adds a table writer property to ctx.store
. The writer is exposed at ctx.store.<tableHandle>
, where <tableHandle>
is the key of the Table
instance in the tables
mapping. It has the following methods:
ctx.store.<tableHandle>.write(record: T)
ctx.store.<tableHandle>.writeMany(records: T[])
Here, T
is a Table
implementation-specific data row type. See the documentation pages on specific file formats for details.
These synchronous methods add rows of data to an in-memory buffer and perform no actual filesystem writes. Instead, the write happens automatically when a new dataset partition is created. The methods return the table writer instance and can be chained.
For example, with a Database
defined like this:
const db = new Database({
tables: {
TransfersTable: new Table(/* table options */)
},
// ...dest and dataset partitioning options
})
the following calls become available in the batch handler:
processor.run(db, async ctx => {
let record = // row in a format specific to Table implementation
ctx.store.TransfersTable.write(record)
ctx.store.TransfersTable.writeMany([record])
})
Filesystem Syncs and Dataset Partitioning
As the indexing proceeds, the processor continues to append new data to the in-memory buffer of Database
. At the end of each batch the tool decides whether to write a new dataset partition. A partition is written when either
- the amount of data stored in the buffer exceeds
chunkSizeMb
(default - 20 MB) across all tables, or syncIntervalBlocks
is finite (default - infinite) and there is at least one row of data in the buffer and either- the processor has just reached the blockchain head, or
- the processor is at the blockchain head and at least
syncIntervalBlocks
have passed since the last sync.
This approach keeps the number of files reasonable while ensuring that the dataset is up-to-date for both high and low output data rates:
- For high, stable data rates (e.g. events from the USDC contract), it suffices to keep
syncIntervalBlocks
at the default value (infinity). Once the processor catches up with the blockchain, it will keep producing partitions of roughly the same size at block intervals much smaller than the blockchain size, thus keeping the dataset updated. The balance between the dataset latency and the number of files is tuned with thechunkSizeMb
parameter. - For low data rates (e.g.
LiquidationCall
events from the AAVE V2 LendingPool - emitted once every few hours)syncIntervalBlocks
allows to govern the dataset latency directly. The first chunk is written once the blockhain head is reached, then the processor keeps the dataset up to date to withinsyncIntervalBlocks
blocks behind the chain. Absence of new partitions beyond that range has a clear interpretation: no new data is available.
If syncIntervalBlocks
is kept at the default value and the data rate is low, the resulting dataset may end up never catching up to the chain in a practical sense. Indeed, in extreme cases the processor may end up writing no data at all. For example, all the aforementioned LiquidationCall
events fit into a CSV file less than 20 Mbytes in size. With the default values of syncIntervalBlocks
and chunkSizeMb
the buffer size never reaches the threshold value and no data is written to the filesystem.
By default, Database
maintains a record of the syncing progress in the status.txt
file. When the processor with a Database
instance starts, it calls the onStateRead()
function that reads the highest reached block from status.txt
on the target filesystem and returns its hash and height. If the file does not exist, the function returns -1 for height and the syncing resumes starting at the next (zeroth/genesis) block.
Syncing status record is updated every time a new partition is written to the dataset: the processor calls onFlush()
which overwrites status.txt
with the new highest reached block.
As a result, the integrity of the data set is guaranteed given the blockchain history up to the point recorded in status.txt
.
The functions onStateRead()
and onStateUpdate()
can be overridden using the hooks
constructor argument field. To do that, set that field to
DatabaseHooks<Dest> {
onStateRead(dest: Dest): Promise<HashAndHeight | undefined>
onStateUpdate(dest: Dest, info: HashAndHeight): Promise<void>
}
Parameters:
- dest: the
Dest
object used byDatabase
. Use it to access the filesystem. - info: a
{height: number, hash: string}
object.
Overriding these functions can be useful for transferring some processor state between batches reliably. A basic example of using hooks
can be found here.