Skip to main content
Version: ArrowSquid

Filesystem store

danger

Make sure you set the options described in the Filesystem Syncs and Dataset Partitioning section. Failure to do so may cause the processor to output an empty dataset.

Overview

Squid SDK provides append-only Store interface implementations for saving the data generated by squids to files. The store is designed primarily for offline analytics. It supports CSV and Parquet files, persisted either locally or to a S3 storage. See the list of Packages below.

File-based stores always partition data sets along the "block height" dimension, even when it is not in the schema. The number of blocks per partition is variable. It is determined by an algorithm that balances the lag of the indexed data against the number of partitions produced by the store. Make sure to configure the algorithm so that the tradeoff is appropriate for your use case.

Same failover guarantees as with the Postgres-based store are provided: the processor will roll back to the last successful state after a restart.

The core component of all file-based stores is the Database class from @subsquid/file-store. To use it, construct an instance and pass it to processor.run(). This results in ctx.store exposing table writers that accept rows of data to be stored.

Example

Save ERC20 Transfer events retrieved by EVM processor in transfers.csv files:

import {EvmBatchProcessor} from '@subsquid/evm-processor'
import * as erc20abi from './abi/erc20'
import {Database, LocalDest} from '@subsquid/file-store'
import {Column, Table, Types} from '@subsquid/file-store-csv'

const processor = /* processor definition */

const dbOptions = {
tables: {
TransfersTable: new Table('transfers.csv', {
from: Column(Types.String()),
to: Column(Types.String()),
value: Column(Types.Numeric())
})
},
dest: new LocalDest('./data'),
chunkSizeMb: 10
}

processor.run(new Database(dbOptions), async (ctx) => {
for (let c of ctx.blocks) {
for (let log of c.logs) {
if (/* the log item is a Transfer we're interested in */) {
let { from, to, value } =
erc20abi.events.Transfer.decode(log)
ctx.store.TransfersTable.write({ from, to, value })
}
}
}
})

The resulting ./data folder may look like this:

./data/
├── 0000000000-0007688959
│   └── transfers.csv
├── 0007688960-0007861589
│   └── transfers.csv
...
├── 0016753040-0016762029
│   └── transfers.csv
└── status.txt

Each of the folders here contains a little over 10 MBytes of data. status.txt contains the height of the last indexed block and its hash.

Packages

@subsquid/file-store is the core package that contains the implementation of Database for filesystems. At least one file format addon must be installed alongside it:

Data in either of these formats can be written to

  • A local filesystem: Supported by @subsquid/file-store out of the box.
  • A bucket in an Amazon S3-compatible cloud: Supported via @subsquid/file-store-s3.

Database Options

Constructor of the Database class from file-store accepts a configuration object as its only argument. Its format is as follows:

DatabaseOptions {
tables: Record<string, Table>
dest: Dest
chunkSizeMb?: number
syncIntervalBlocks?: number
hooks?: DatabaseHooks<Dest>
}

Here,

  • Table is an interface for classes that make table writers, objects that convert in-memory tabular data into format-specific file contents. An implementation of Table is available for every file format supported by file-store. Consult pages about specific output formats to find out how to define Tables.
  • tables is a mapping from developer-defined string handles to Table instances. A table writer will be created for each Table in this mapping. It will be exposed at ctx.store.<tableHandle>.
  • dest is an instance of Dest, an interface for objects that take the properly formatted file contents and write them onto a particular filesystem. An implementation of Dest is available for every filesystem supported by file-store. For local filesystems use the LocalDest class from the @subsquid/file-store package and supply new LocalDest(outputDirectoryName) here. For other targets consult documentation pages specific to your filesystem choice.
  • chunkSizeMb, syncIntervalBlocks and hooks are optional parameters that tune the behavior of the dataset partitioning algorithm.

Table Writer Interface

For each Table supplied via the tables field of the constructor argument, Database adds a table writer property to ctx.store. The writer is exposed at ctx.store.<tableHandle>, where <tableHandle> is the key of the Table instance in the tables mapping. It has the following methods:

ctx.store.<tableHandle>.write(record: T)
ctx.store.<tableHandle>.writeMany(records: T[])

Here, T is a Table implementation-specific data row type. See the documentation pages on specific file formats for details.

These synchronous methods add rows of data to an in-memory buffer and perform no actual filesystem writes. Instead, the write happens automatically when a new dataset partition is created. The methods return the table writer instance and can be chained.

For example, with a Database defined like this:

const db = new Database({
tables: {
TransfersTable: new Table(/* table options */)
},
// ...dest and dataset partitioning options
})

the following calls become available in the batch handler:

processor.run(db, async ctx => {
let record = // row in a format specific to Table implementation
ctx.store.TransfersTable.write(record)
ctx.store.TransfersTable.writeMany([record])
})

Filesystem Syncs and Dataset Partitioning

As the indexing proceeds, the processor continues to append new data to the in-memory buffer of Database. At the end of each batch the tool decides whether to write a new dataset partition. A partition is written when either

  • the amount of data stored in the buffer exceeds chunkSizeMb (default - 20 MB) across all tables, or
  • syncIntervalBlocks is finite (default - infinite) and there is at least one row of data in the buffer and either
    • the processor has just reached the blockchain head, or
    • the processor is at the blockchain head and at least syncIntervalBlocks have passed since the last sync.

This approach keeps the number of files reasonable while ensuring that the dataset is up-to-date for both high and low output data rates:

  • For high, stable data rates (e.g. events from the USDC contract), it suffices to keep syncIntervalBlocks at the default value (infinity). Once the processor catches up with the blockchain, it will keep producing partitions of roughly the same size at block intervals much smaller than the blockchain size, thus keeping the dataset updated. The balance between the dataset latency and the number of files is tuned with the chunkSizeMb parameter.
  • For low data rates (e.g. LiquidationCall events from the AAVE V2 LendingPool - emitted once every few hours) syncIntervalBlocks allows to govern the dataset latency directly. The first chunk is written once the blockhain head is reached, then the processor keeps the dataset up to date to within syncIntervalBlocks blocks behind the chain. Absence of new partitions beyond that range has a clear interpretation: no new data is available.
danger

If syncIntervalBlocks is kept at the default value and the data rate is low, the resulting dataset may end up never catching up to the chain in a practical sense. Indeed, in extreme cases the processor may end up writing no data at all. For example, all the aforementioned LiquidationCall events fit into a CSV file less than 20 Mbytes in size. With the default values of syncIntervalBlocks and chunkSizeMb the buffer size never reaches the threshold value and no data is written to the filesystem.

By default, Database maintains a record of the syncing progress in the status.txt file. When the processor with a Database instance starts, it calls the onStateRead() function that reads the highest reached block from status.txt on the target filesystem and returns its hash and height. If the file does not exist, the function returns -1 for height and the syncing resumes starting at the next (zeroth/genesis) block.

Syncing status record is updated every time a new partition is written to the dataset: the processor calls onFlush() which overwrites status.txt with the new highest reached block.

As a result, the integrity of the data set is guaranteed given the blockchain history up to the point recorded in status.txt.

The functions onStateRead() and onStateUpdate() can be overridden using the hooks constructor argument field. To do that, set that field to

DatabaseHooks<Dest> {
onStateRead(dest: Dest): Promise<HashAndHeight | undefined>
onStateUpdate(dest: Dest, info: HashAndHeight): Promise<void>
}

Parameters:

  • dest: the Dest object used by Database. Use it to access the filesystem.
  • info: a {height: number, hash: string} object.

Overriding these functions can be useful for transferring some processor state between batches reliably. A basic example of using hooks can be found here.