Skip to main content
Version: Firesquid

Batch processor in action

An end-to-end idiomatic usage of SubstrateBatchProcessor can be inspected in the squid-substrate-template and also learned from more elaborate examples.

Here we highlight the key steps and put together the configuration and the data handling definition to illustrate the concepts covered so far.

1. Configuration

See Configuration section for more details.

const processor = new SubstrateBatchProcessor()
// set the data source
.setDataSource({
archive: lookupArchive("kusama", {release: "FireSquid"})
})
// set subsciptions and define the data
// to be fetche for each log item
.addEvent('Balances.Transfer', {
data: {
event: {
args: true,
extrinsic: {
hash: true,
fee: true
}
}
}
})
.addCall('Balances.transfer_keep_alive', {
data: {
call: {
success: true,
args: true,
extrinsic: {
args: true
}
},
}
})

2. Define a custom data facade and extract normalized data from BatchContext

The following code snippet illustrates how the data is extracted and normalized into some user-specific facade interface TransferData. Note how the array of Item is processed based on the kind and the name of the item and that the resulting normalized data respects the canonical ordering of the ctx.blocks and ctx.blocks.items within each block.

Type-safe access and decoding of the call and event data is done with the help of the classes generated by a suitable kind of the typegen tool.

import {BatchContext, BatchProcessorItem, SubstrateBatchProcessor} from "@subsquid/substrate-processor"
import {Store, TypeormDatabase} from "@subsquid/typeorm-store"

// ... processor configuration from the previous step

type Item = BatchProcessorItem<typeof processor>
type Ctx = BatchContext<Store, Item>

// some normalized user-define data
interface TransferData {
id: string
timestamp: Date
from: string
to: string
amount: bigint
fee?: bigint
}

// extract and normalize
function getTransfers(ctx: Ctx): TransferData[] {
let transfers: TransferData[] = []
for (let block of ctx.blocks) {
for (let item of block.items) {
if (item.kind === "event" && item.name === "Balances.Transfer") {
// instatitate the classes generated by typegen
// to decode the item data
let e = new BalancesTransferEvent(ctx, item.event)
let rec: {from: Uint8Array, to: Uint8Array, amount: bigint}
// normalize the historical versions of the data
// using the version getters
if (e.isV1020 || e.isV1050) {
let [from, to, amount,] = e.asV1020
rec = {from, to, amount}
} else {
rec = e.asV9130
}
// extract and normalize the `item.event` data
transfers.push({
// some data manipulation with `item.event` data
id: item.event.id,
timestamp: new Date(block.header.timestamp),
extrinsicHash: item.event.extrinsic?.hash,
from: ss58.codec('kusama').encode(rec.from),
to: ss58.codec('kusama').encode(rec.to),
amount: rec.amount,
fee: item.event.extrinsic?.fee || 0n
})
}
if (item.kind === "call" && item.name === "Balances.transfer_keep_alive") {
// extranct and normalize the `item.call` data
transfer.push({ /* some data manipulation with `item.call` data */ })
}
}
}
return transfers
}

3. Run the processor and store the transformed data into the target database

The snippet below assumes that we are using TypeormDatabase and that the database schema together with the model entities types has already been prepared. Consult [/develop-a-squid/schema-spec] for mode details.

import {Store, TypeormDatabase} from "@subsquid/typeorm-store"
// an entity type generated from the schema file
import {Transfer} from "./model"

// ... processor configuration
// ... `getTransfers(Ctx)` definition

processor.run(new TypeormDatabase(), async ctx => {
// get the normalized data from the context
let transfersData = getTransfers(ctx)

// an array of entity classes
let transfers: Transfer[] = []

for (let t of transfersData) {
let {id, blockNumber, timestamp, extrinsicHash, amount, fee} = t

// join some extra data
let from = getAccount(accounts, t.from)
let to = getAccount(accounts, t.to)

// populate the entity classes
// with the extracted data
transfers.push(new Transfer({
id,
blockNumber,
timestamp,
extrinsicHash,
from,
to,
amount,
fee
}))
}

// persist an array of entities
// in a single statement
await ctx.store.insert(transfers)
})