Skip to content

Commit

Permalink
chore: formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Sep 30, 2024
1 parent 3f5e253 commit 3f57983
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 24 deletions.
28 changes: 17 additions & 11 deletions examples/Batching/batching.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"fmt"
"math/rand"
"os"
"text/tabwriter"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
"github.com/apache/arrow/go/v15/arrow"
)

const NumPoints = 54

func main() {
// Create a random number generator
r := rand.New(rand.NewSource(456))
Expand Down Expand Up @@ -44,10 +47,10 @@ func main() {
b := batching.NewBatcher(batching.WithSize(5))

// Simulate delay of a second
t := time.Now().Add(-54 * time.Second)
t := time.Now().Add(-NumPoints * time.Second)

// Write 54 points synchronously to the batcher
for i := 0; i < 54; i++ {
// Write points synchronously to the batcher
for range NumPoints {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Paris"},
map[string]any{
Expand Down Expand Up @@ -82,7 +85,7 @@ func main() {
// Create a batcher with a size of 5, a ready callback and an emit callback to write the batch to the client
b = batching.NewBatcher(
batching.WithSize(5),
batching.WithReadyCallback(func() { fmt.Println("ready") }),
batching.WithReadyCallback(func() { fmt.Println("-- ready --") }),
batching.WithEmitCallback(func(points []*influxdb3.Point) {
err = client.WritePoints(context.Background(), points)
if err != nil {
Expand All @@ -92,10 +95,10 @@ func main() {
)

// Simulate delay of a second
t = time.Now().Add(-54 * time.Second)
t = time.Now().Add(-NumPoints * time.Second)

// Write 54 points synchronously to the batcher
for i := 0; i < 54; i++ {
// Write points synchronously to the batcher
for range NumPoints {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Madrid"},
map[string]any{
Expand Down Expand Up @@ -131,12 +134,15 @@ func main() {
panic(err)
}

// Use a tabwriter to format the output
w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0)
defer w.Flush()

fmt.Fprintln(w, "\nTime\tLocation\tTemperature\tHumidity")
// Process the data
for iterator.Next() {
value := iterator.Value()
fmt.Printf("%s at %v:\n", value["location"],
(value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC822))
fmt.Printf(" temperature: %f\n", value["temperature"])
fmt.Printf(" humidity : %d%%\n", value["humidity"])
t := (value["time"].(arrow.Timestamp)).ToTime(arrow.Nanosecond).Format(time.RFC3339)
fmt.Fprintf(w, "%v\t%s\t%.1f\t%d\n", t, value["location"], value["temperature"], value["humidity"])
}
}
1 change: 1 addition & 0 deletions influxdb3/batching/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/

// Package batching provides a batcher to collect points and emit them as batches.
package batching

import (
Expand Down
19 changes: 10 additions & 9 deletions influxdb3/batching/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
package batching

import (
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/stretchr/testify/assert"
"sync"
"testing"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/stretchr/testify/assert"
)

func TestDefaultValues(t *testing.T) {
Expand Down Expand Up @@ -63,12 +64,12 @@ func TestAddAndEmit(t *testing.T) {
}),
)

for i := 0; i < batchSize; i++ {
for range batchSize {
b.Add(&influxdb3.Point{})
}

assert.True(t, emitted, "Emit callback should have been called")
assert.Equal(t, batchSize, len(emittedPoints), "The emitted batch size should match the expected size")
assert.Len(t, emittedPoints, batchSize, "The emitted batch size should match the expected size")
}

func TestReadyCallback(t *testing.T) {
Expand All @@ -82,7 +83,7 @@ func TestReadyCallback(t *testing.T) {
}),
)

for i := 0; i < batchSize; i++ {
for range batchSize {
b.Add(&influxdb3.Point{})
}

Expand All @@ -106,7 +107,7 @@ func TestPartialEmit(t *testing.T) {
points := b.Emit()

assert.False(t, emitted, "Emit callback should not have been called automatically")
assert.Equal(t, 2, len(points), "Emit should return all points when batch size is not reached")
assert.Len(t, points, 2, "Emit should return all points when batch size is not reached")
}

func TestThreadSafety(t *testing.T) {
Expand All @@ -120,11 +121,11 @@ func TestThreadSafety(t *testing.T) {
}),
)

for i := 0; i < 25; i++ {
for range 25 {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 4; j++ {
for range 4 {
b.Add(&influxdb3.Point{})
}
}()
Expand All @@ -134,5 +135,5 @@ func TestThreadSafety(t *testing.T) {

points := b.Emit()
assert.Equal(t, 20, emits, "All points should have been emitted")
assert.Equal(t, 0, len(points), "Remaining points should be emitted correctly")
assert.Empty(t, points, "Remaining points should be emitted correctly")
}
9 changes: 5 additions & 4 deletions influxdb3/batching/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ package batching_test
import (
"context"
"fmt"
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
"log"
"math/rand"
"time"

"github.com/InfluxCommunity/influxdb3-go/influxdb3"
"github.com/InfluxCommunity/influxdb3-go/influxdb3/batching"
)

func Example_batcher() {
Expand All @@ -54,7 +55,7 @@ func Example_batcher() {
t := time.Now().Add(-54 * time.Second)

// Write 54 points synchronously to the batcher
for i := 0; i < 54; i++ {
for range 54 {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Paris"},
map[string]any{
Expand Down Expand Up @@ -101,7 +102,7 @@ func Example_batcher() {
t = time.Now().Add(-54 * time.Second)

// Write 54 points synchronously to the batcher
for i := 0; i < 54; i++ {
for range 54 {
p := influxdb3.NewPoint("stat",
map[string]string{"location": "Madrid"},
map[string]any{
Expand Down

0 comments on commit 3f57983

Please sign in to comment.