Skip to main content
Version: ArrowSquid

SubstrateBatchProcessor in action

danger

This page reflects the current state of ArrowSquid Beta release. Interfaces of the processor are stabilized, but interfaces of the typegen-generated wrapper classes are not.

An end-to-end idiomatic usage of SubstrateBatchProcessor can be inspected in the squid-substrate-template and also learned from more elaborate examples.

Here we highlight the key steps and put together a configuration and a data handling definition to illustrate the concepts covered so far.

1. Set up the processor

See Setup section for more details.

src/processor.ts
import {SubstrateBatchProcessor} from '@subsquid/substrate-processor'

export const processor = new SubstrateBatchProcessor()
// set the data source
.setDataSource({
archive: lookupArchive('kusama', {release: 'ArrowSquid'}),
chain: 'https://kusama-rpc.dwellir.com'
})
// add data requests
.addEvent({
name: ['Balances.Transfer'],
extrinsic: true
})
.addCall({
name: ['Balances.transfer_keep_alive'],
extrinsic: true
})
// define the data
// to be fetched for each data item
.setFields({
extrinsic: {
hash: true,
fee: true
},
call: {
success: true
}
})

2. Define a custom data facade and extract normalized data from BatchContext

The following code snippet illustrates how the data is extracted and normalized into some user-specific facade interface TransferData. Note how the data items in each of the block data iterables (events, calls, extrinsics) are filtered in the batch handler to make sure they match the data requests.

Type-safe access and decoding of the call and event data is done with the help of the classes generated by a suitable typegen tool.

import {
SubstrateBatchProcessorFields,
DataHandlerContext
} from '@subsquid/substrate-processor'
import {Store, TypeormDatabase} from '@subsquid/typeorm-store'
import {
BalancesTransferEventV1020,
BalancesTransferEventV1050,
BalancesTransferEventV9130
} from './types/events'

import {processor} from './processor'

type Fields = SubstrateBatchProcessorFields<typeof processor>
type Ctx = DataHandlerContext<Store, Fields>

// some normalized user-define data
interface TransferData {
id: string
timestamp: Date
from: string
to: string
amount: bigint
fee?: bigint
}

// extract and normalize
function getTransfers(ctx: Ctx): TransferData[] {
let transfers: TransferData[] = []
for (let block of ctx.blocks) {
for (let event of block.events) {
if (event.name==='Balances.Transfer') {
// decode the event data with runtime-versioned
// classes generated by typegen
let rec: {from: Uint8Array, to: Uint8Array, amount: bigint}
if (BalancesTransferEventV1020.is(event)) {
let [from, to, amount,] = BalancesTransferEventV1020.decode(event)
rec = {from, to, amount}
} else if (BalancesTransferEventV1050.is(event)) {
let [from, to, amount] = BalancesTransferEventV1050.decode(event)
rec = {from, to, amount}
} else {
rec = BalancesTransferEventV9130.decode(event)
}
// extract and normalize the `item.event` data
transfers.push({
id: event.id,
timestamp: new Date(block.header.timestamp),
extrinsicHash: event.extrinsic?.hash,
from: ss58.codec('kusama').encode(rec.from),
to: ss58.codec('kusama').encode(rec.to),
amount: rec.amount,
fee: event.extrinsic?.fee || 0n
})
}
}
for (let call of block.calls) {
if (call.name==='Balances.transfer_keep_alive') {
// extranct and normalize the call data
transfer.push({
/* some data manipulation on the call data */
})
}
}
}
return transfers
}

3. Run the processor and store the transformed data into the target database

The snippet below assumes that we are using TypeormDatabase and that the database schema together with the model entity types has already been prepared. Consult the Schema file section for more details.

import {Store, TypeormDatabase} from '@subsquid/typeorm-store'
// an entity type generated from the schema file
import {Transfer} from './model'

// ... processor configuration
// ... `getTransfers(Ctx)` definition

processor.run(new TypeormDatabase(), async ctx => {
// get the normalized data from the context
let transfersData = getTransfers(ctx)

// an array of entity class instances
let transfers: Transfer[] = []

for (let t of transfersData) {
let {id, blockNumber, timestamp, extrinsicHash, amount, fee} = t

// join some extra data
let from = getAccount(accounts, t.from)
let to = getAccount(accounts, t.to)

// populate entity class instances
// with the extracted data
transfers.push(new Transfer({
id,
blockNumber,
timestamp,
extrinsicHash,
from,
to,
amount,
fee
}))
}

// persist an array of entities
// in a single statement
await ctx.store.insert(transfers)
})