diff --git a/CHANGELOG.md b/CHANGELOG.md index eb9efd6..4ebc2f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 0.12.0 [unreleased] +### Features + +1. [#107](https://github.com/InfluxCommunity/influxdb3-go/pull/107): Added `Batcher` to simplify the process of writing data in batches. + ## 0.11.0 [2024-09-27] ### Bug Fixes diff --git a/examples/Batching/batching.go b/examples/Batching/batching.go new file mode 100644 index 0000000..f503667 --- /dev/null +++ b/examples/Batching/batching.go @@ -0,0 +1,142 @@ +package main + +import ( + "context" + "fmt" + "math/rand" + "os" + "time" + + "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/influxdb3/batching" + "github.com/apache/arrow/go/v15/arrow" +) + +func main() { + // Create a random number generator + r := rand.New(rand.NewSource(456)) + // Retrieve credentials from environment variables. + url := os.Getenv("INFLUX_URL") + token := os.Getenv("INFLUX_TOKEN") + database := os.Getenv("INFLUX_DATABASE") + + // Instantiate a client using your credentials. + client, err := influxdb3.New(influxdb3.ClientConfig{ + Host: url, + Token: token, + Database: database, + }) + if err != nil { + panic(err) + } + + // Close the client when finished and raise any errors. + defer func(client *influxdb3.Client) { + err := client.Close() + if err != nil { + panic(err) + } + }(client) + + // Synchronous use + + // Create a Batcher with a size of 5 + b := batching.NewBatcher(batching.WithSize(5)) + + // Simulate delay of a second + t := time.Now().Add(-54 * time.Second) + + // Write 54 points synchronously to the batcher + for i := 0; i < 54; i++ { + p := influxdb3.NewPoint("stat", + map[string]string{"location": "Paris"}, + map[string]any{ + "temperature": 15 + r.Float64()*20, + "humidity": 30 + r.Int63n(40), + }, + t) + + // Add the point to the batcher + b.Add(p) + + // Update time + t = t.Add(time.Second) + + // If the batcher is ready, write the batch to the client and reset the batcher + if b.Ready() { + err := client.WritePoints(context.Background(), b.Emit()) + if err != nil { + panic(err) + } + } + } + + // Write the final batch to the client + err = client.WritePoints(context.Background(), b.Emit()) + if err != nil { + panic(err) + } + + // Asynchronous use + + // Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client + b = batching.NewBatcher( + batching.WithSize(5), + batching.WithReadyCallback(func() { fmt.Println("ready") }), + batching.WithEmitCallback(func(points []*influxdb3.Point) { + err = client.WritePoints(context.Background(), points) + if err != nil { + panic(err) + } + }), + ) + + // Simulate delay of a second + t = time.Now().Add(-54 * time.Second) + + // Write 54 points synchronously to the batcher + for i := 0; i < 54; i++ { + p := influxdb3.NewPoint("stat", + map[string]string{"location": "Madrid"}, + map[string]any{ + "temperature": 15 + r.Float64()*20, + "humidity": 30 + r.Int63n(40), + }, + t) + + // Add the point to the batcher + b.Add(p) + + // Update time + t = t.Add(time.Second) + } + + // Write the final batch to the client + err = client.WritePoints(context.Background(), b.Emit()) + if err != nil { + panic(err) + } + + // Prepare an SQL query + query := ` + SELECT * + FROM stat + WHERE time >= now() - interval '5 minutes' + AND location IN ('Paris', 'Madrid') + ` + + // Run the query + iterator, err := client.Query(context.Background(), query) + if err != nil { + panic(err) + } + + // Process the data + for iterator.Next() { + value := iterator.Value() + fmt.Printf("%s at %v:\n", value["location"], + (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC822)) + fmt.Printf(" temperature: %f\n", value["temperature"]) + fmt.Printf(" humidity : %d%%\n", value["humidity"]) + } +} diff --git a/examples/README.md b/examples/README.md index e2c5ddf..8010c0f 100644 --- a/examples/README.md +++ b/examples/README.md @@ -3,3 +3,4 @@ - [Write and query data](Basic/basic.go) - A complete Go example that demonstrates the different ways of writing data, and then queries your data stored in InfluxDB v3 (formerly InfluxDB IOx). - [Downsampling](Downsampling/downsampling.go) - A complete Go example that uses a downsampling query and then writes downsampled data back to a different table. - [HTTP Error Handling](HTTPErrorHandled/httpErrorHandled.go) - A complete Go example for reading HTTP headers in case of an server error occurs. +- [Batching write](Batching/batching.go) - A complete Go example that demonstrates how to write data in batches. diff --git a/influxdb3/batching/batcher.go b/influxdb3/batching/batcher.go new file mode 100644 index 0000000..2d32e32 --- /dev/null +++ b/influxdb3/batching/batcher.go @@ -0,0 +1,152 @@ +/* +The MIT License + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +package batching + +import ( + "sync" + + "github.com/InfluxCommunity/influxdb3-go/influxdb3" +) + +// Option to adapt properties of a batcher +type Option func(*Batcher) + +// WithSize changes the batch-size emitted by the batcher +func WithSize(size int) Option { + return func(b *Batcher) { + b.size = size + } +} + +// WithCapacity changes the initial capacity of the points buffer +func WithCapacity(capacity int) Option { + return func(b *Batcher) { + b.capacity = capacity + } +} + +// WithReadyCallback sets the function called when a new batch is ready. The +// batcher will wait for the callback to finish, so please return as fast as +// possible and move long-running processing to a go-routine. +func WithReadyCallback(f func()) Option { + return func(b *Batcher) { + b.callbackReady = f + } +} + +// WithEmitCallback sets the function called when a new batch is ready with the +// batch of points. The batcher will wait for the callback to finish, so please +// return as fast as possible and move long-running processing to a go-routine. +func WithEmitCallback(f func([]*influxdb3.Point)) Option { + return func(b *Batcher) { + b.callbackEmit = f + } +} + +// DefaultBatchSize is the default number of points emitted +const DefaultBatchSize = 1000 + +// DefaultCapacity is the default initial capacity of the point buffer +const DefaultCapacity = 2 * DefaultBatchSize + +// Batcher collects points and emits them as batches +type Batcher struct { + size int + capacity int + + callbackReady func() + callbackEmit func([]*influxdb3.Point) + + points []*influxdb3.Point + sync.Mutex +} + +// NewBatcher creates and initializes a new Batcher instance applying the +// specified options. By default, a batch-size is DefaultBatchSize and the +// initial capacity is DefaultCapacity. +func NewBatcher(options ...Option) *Batcher { + // Set up a batcher with the default values + b := &Batcher{ + size: DefaultBatchSize, + capacity: DefaultCapacity, + } + + // Apply the options + for _, o := range options { + o(b) + } + + // Setup the internal data + b.points = make([]*influxdb3.Point, 0, b.capacity) + + return b +} + +// Add a metric to the batcher and call the given callbacks if any +func (b *Batcher) Add(p *influxdb3.Point) { + b.Lock() + defer b.Unlock() + + // Add the point + b.points = append(b.points, p) + + // Call callbacks if a new batch is ready + if b.isReady() { + if b.callbackReady != nil { + b.callbackReady() + } + if b.callbackEmit != nil { + b.callbackEmit(b.emitPoints()) + } + } +} + +// Ready tells the call if a new batch is ready to be emitted +func (b *Batcher) Ready() bool { + b.Lock() + defer b.Unlock() + return b.isReady() +} + +func (b *Batcher) isReady() bool { + return len(b.points) >= b.size +} + +// Emit returns a new batch of points with the provided batch size or with the +// remaining points. Please drain the points at the end of your processing to +// get the remaining points not filling up a batch. +func (b *Batcher) Emit() []*influxdb3.Point { + b.Lock() + defer b.Unlock() + + return b.emitPoints() +} + +func (b *Batcher) emitPoints() []*influxdb3.Point { + l := min(b.size, len(b.points)) + + points := b.points[:l] + b.points = b.points[l:] + + return points +} diff --git a/influxdb3/batching/batcher_test.go b/influxdb3/batching/batcher_test.go new file mode 100644 index 0000000..775659e --- /dev/null +++ b/influxdb3/batching/batcher_test.go @@ -0,0 +1,138 @@ +/* + The MIT License + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +package batching + +import ( + "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/stretchr/testify/assert" + "sync" + "testing" +) + +func TestDefaultValues(t *testing.T) { + b := NewBatcher() + + // Check that default values are set correctly + assert.Equal(t, DefaultBatchSize, b.size) + assert.Equal(t, DefaultCapacity, cap(b.points)) +} + +func TestCustomValues(t *testing.T) { + batchSize := 10 + capacity := 100 + + b := NewBatcher( + WithSize(batchSize), + WithCapacity(capacity), + ) + + assert.Equal(t, batchSize, b.size) + assert.Equal(t, capacity, cap(b.points)) +} + +func TestAddAndEmit(t *testing.T) { + batchSize := 5 + emitted := false + var emittedPoints []*influxdb3.Point + + b := NewBatcher( + WithSize(batchSize), + WithEmitCallback(func(points []*influxdb3.Point) { + emitted = true + emittedPoints = points + }), + ) + + for i := 0; i < batchSize; i++ { + b.Add(&influxdb3.Point{}) + } + + assert.True(t, emitted, "Emit callback should have been called") + assert.Equal(t, batchSize, len(emittedPoints), "The emitted batch size should match the expected size") +} + +func TestReadyCallback(t *testing.T) { + batchSize := 5 + readyCalled := false + + b := NewBatcher( + WithSize(batchSize), + WithReadyCallback(func() { + readyCalled = true + }), + ) + + for i := 0; i < batchSize; i++ { + b.Add(&influxdb3.Point{}) + } + + assert.True(t, readyCalled, "Ready callback should have been called when the batch is full") +} + +func TestPartialEmit(t *testing.T) { + batchSize := 5 + emitted := false + + b := NewBatcher( + WithSize(batchSize), + WithEmitCallback(func(points []*influxdb3.Point) { + emitted = true + }), + ) + + b.Add(&influxdb3.Point{}) + b.Add(&influxdb3.Point{}) + + points := b.Emit() + + assert.False(t, emitted, "Emit callback should not have been called automatically") + assert.Equal(t, 2, len(points), "Emit should return all points when batch size is not reached") +} + +func TestThreadSafety(t *testing.T) { + batchSize := 5 + var wg sync.WaitGroup + emits := 0 + b := NewBatcher( + WithSize(batchSize), + WithEmitCallback(func(points []*influxdb3.Point) { + emits++ + }), + ) + + for i := 0; i < 25; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 4; j++ { + b.Add(&influxdb3.Point{}) + } + }() + } + + wg.Wait() + + points := b.Emit() + assert.Equal(t, 20, emits, "All points should have been emitted") + assert.Equal(t, 0, len(points), "Remaining points should be emitted correctly") +} diff --git a/influxdb3/batching/example_test.go b/influxdb3/batching/example_test.go new file mode 100644 index 0000000..d93b1dc --- /dev/null +++ b/influxdb3/batching/example_test.go @@ -0,0 +1,124 @@ +/* + The MIT License + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +*/ + +package batching_test + +import ( + "context" + "fmt" + "github.com/InfluxCommunity/influxdb3-go/influxdb3" + "github.com/InfluxCommunity/influxdb3-go/influxdb3/batching" + "log" + "math/rand" + "time" +) + +func Example_batcher() { + // Create a random number generator + r := rand.New(rand.NewSource(456)) + + // Instantiate a client using your credentials. + client, err := influxdb3.NewFromEnv() + if err != nil { + log.Fatal(err) + } + + // Close the client when finished and raise any errors. + defer client.Close() + + // Synchronous use + + // Create a Batcher with a size of 5 + b := batching.NewBatcher(batching.WithSize(5)) + + // Simulate delay of a second + t := time.Now().Add(-54 * time.Second) + + // Write 54 points synchronously to the batcher + for i := 0; i < 54; i++ { + p := influxdb3.NewPoint("stat", + map[string]string{"location": "Paris"}, + map[string]any{ + "temperature": 15 + r.Float64()*20, + "humidity": 30 + r.Int63n(40), + }, + t) + + // Add the point to the batcher + b.Add(p) + // Update time + t = t.Add(time.Second) + + // If the batcher is ready, write the batch to the client and reset the batcher + if b.Ready() { + err := client.WritePoints(context.Background(), b.Emit()) + if err != nil { + log.Fatal(err) + } + } + } + + // Write the final batch to the client + err = client.WritePoints(context.Background(), b.Emit()) + if err != nil { + panic(err) + } + + // Asynchronous use + + // Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client + b = batching.NewBatcher( + batching.WithSize(5), + batching.WithReadyCallback(func() { fmt.Println("ready") }), + batching.WithEmitCallback(func(points []*influxdb3.Point) { + err = client.WritePoints(context.Background(), points) + if err != nil { + log.Fatal(err) + } + }), + ) + + // Simulate delay of a second + t = time.Now().Add(-54 * time.Second) + + // Write 54 points synchronously to the batcher + for i := 0; i < 54; i++ { + p := influxdb3.NewPoint("stat", + map[string]string{"location": "Madrid"}, + map[string]any{ + "temperature": 15 + r.Float64()*20, + "humidity": 30 + r.Int63n(40), + }, + t) + + // Add the point to the batcher + b.Add(p) + // Update time + t = t.Add(time.Second) + } + + // Write the final batch to the client + err = client.WritePoints(context.Background(), b.Emit()) + if err != nil { + log.Fatal(err) + } +}