Bulk indexing
When ingesting many documents, use the Bulk API to send multiple operations in a single request. For the full bulk API reference, see the Bulk API documentation. For raw-NDJSON bulk requests, see the low-level API.
With the typed client, build a bulk request by appending operations and then executing it:
index := "my-index"
id1 := "1"
id2 := "2"
bulk := es.Bulk()
if err := bulk.IndexOp(
types.IndexOperation{Index_: &index, Id_: &id1},
map[string]any{"title": "Test 1"},
); err != nil {
// Handle error.
}
if err := bulk.IndexOp(
types.IndexOperation{Index_: &index, Id_: &id2},
map[string]any{"title": "Test 2"},
); err != nil {
// Handle error.
}
res, err := bulk.Do(context.Background())
if err != nil {
// Handle error.
}
if res.Errors {
// One or more operations failed.
}
- Create a bulk request builder.
- Append index operations with metadata and document body.
- Execute the bulk request.
The client repository contains complete, runnable examples for bulk ingestion (typed bulk, esutil.BulkIndexer, benchmarks, Kafka ingestion): _examples/bulk.
For a higher-level API that takes care of batching, flushing, and concurrency, use the esutil.BulkIndexer helper. BulkIndexer accepts any client that implements the transport interface (Perform(*http.Request) (*http.Response, error)), so it works with both *elasticsearch.TypedClient and *elasticsearch.Client.
The BulkIndexer is designed to be long-lived: create it once, keep adding items over time (potentially from multiple goroutines), and call Close() once when you are done (for example with defer).
client, err := elasticsearch.NewTyped()
if err != nil {
// Handle error.
}
defer func() {
if err := client.Close(context.Background()); err != nil {
// Handle error.
}
}()
ctx := context.Background()
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
Index: "test",
NumWorkers: 4,
FlushBytes: 5_000_000,
})
if err != nil {
// Handle error.
}
defer func() {
if err := indexer.Close(ctx); err != nil {
// Handle error.
}
}()
_ = indexer.Add(ctx, esutil.BulkIndexerItem{
Action: "index",
DocumentID: "1",
Body: strings.NewReader(`{"title":"Test"}`),
})
- Any client that implements the transport interface. Here we pass a
*elasticsearch.TypedClient;*elasticsearch.Clientworks identically. - The default index name for all items.
- Number of concurrent worker goroutines.
- Flush threshold in bytes (flush when the buffer reaches this size).
BulkIndexerConfig full reference
The esutil.BulkIndexerConfig struct supports the following fields:
| Field | Type | Description |
|---|---|---|
Client |
esapi.Transport |
Any transport that implements Perform (required). |
Index |
string |
Default index name for items that don't specify one. |
NumWorkers |
int |
Number of concurrent worker goroutines (default: number of CPUs). |
FlushBytes |
int |
Flush threshold in bytes. |
FlushInterval |
time.Duration |
Periodic flush interval. |
FlushJitter |
time.Duration |
Max random jitter added to FlushInterval per worker to avoid lockstep flushes. Default: 0 (disabled). |
Pipeline |
string |
Default ingest pipeline for all items. |
Refresh |
string |
Refresh policy after each flush ("true", "false", "wait_for"). |
Routing |
string |
Default routing value for all items. |
Timeout |
time.Duration |
Timeout for each bulk request. |
OnError |
func(context.Context, error) |
Callback invoked when a bulk request fails. |
OnFlushStart |
func(context.Context) context.Context |
Callback invoked before each flush. |
OnFlushEnd |
func(context.Context) |
Callback invoked after each flush. |
Decoder |
BulkResponseJSONDecoder |
Custom JSON decoder for bulk responses. |
DebugLogger |
BulkIndexerDebugLogger |
Logger for debug output. |
In addition to automatic flushing (based on FlushBytes and FlushInterval), you can explicitly flush the indexer at any time using Flush(). This drains all currently queued items, flushes all worker buffers to Elasticsearch, and waits for completion. The indexer remains usable after Flush(); you can continue adding items.
This is useful when you need to ensure all pending documents are sent before proceeding, for example after ingesting a batch of records or before reading back recently indexed data.
// Add items to the indexer
for _, doc := range documents {
_ = indexer.Add(ctx, esutil.BulkIndexerItem{
Action: "index",
Body: strings.NewReader(doc),
OnSuccess: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
log.Printf("Indexed %s: %s", item.DocumentID, res.Result)
},
OnFailure: func(_ context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
log.Printf("Failed %s: %s", item.DocumentID, err)
},
})
}
// Flush: send all pending items to Elasticsearch and wait for completion
if err := indexer.Flush(ctx); err != nil {
// Handle error.
}
// The indexer is still usable; keep adding items or close it when done.
Flush() follows the same callback model as automatic flushes: per-item results are delivered through the OnSuccess and OnFailure callbacks on each BulkIndexerItem. Flush itself only returns an error for transport-level failures or context cancellation.
Note: Do not call
Close()and recreate the indexer to achieve a synchronous flush. TheBulkIndexeris designed to be long-lived. UseFlush()instead, and only callClose()once when you are done with the indexer entirely.