|
| 1 | +package dbsyncer |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "strings" |
| 7 | + |
| 8 | + cloudevents "github.com/cloudevents/sdk-go/v2" |
| 9 | + "github.com/go-logr/logr" |
| 10 | + "gorm.io/gorm/clause" |
| 11 | + ctrl "sigs.k8s.io/controller-runtime" |
| 12 | + |
| 13 | + "github.com/stolostron/multicluster-global-hub/manager/pkg/statussyncer/conflator" |
| 14 | + "github.com/stolostron/multicluster-global-hub/pkg/bundle/event" |
| 15 | + eventversion "github.com/stolostron/multicluster-global-hub/pkg/bundle/version" |
| 16 | + "github.com/stolostron/multicluster-global-hub/pkg/database" |
| 17 | + "github.com/stolostron/multicluster-global-hub/pkg/enum" |
| 18 | +) |
| 19 | + |
| 20 | +type managedClusterEventHandler struct { |
| 21 | + log logr.Logger |
| 22 | + eventType string |
| 23 | + eventSyncMode enum.EventSyncMode |
| 24 | + eventPriority conflator.ConflationPriority |
| 25 | +} |
| 26 | + |
| 27 | +func NewManagedClusterEventHandler() conflator.Handler { |
| 28 | + eventType := string(enum.ManagedClusterEventType) |
| 29 | + logName := strings.Replace(eventType, enum.EventTypePrefix, "", -1) |
| 30 | + return &managedClusterEventHandler{ |
| 31 | + log: ctrl.Log.WithName(logName), |
| 32 | + eventType: eventType, |
| 33 | + eventSyncMode: enum.DeltaStateMode, |
| 34 | + eventPriority: conflator.ManagedClusterEventPriority, |
| 35 | + } |
| 36 | +} |
| 37 | + |
| 38 | +func (h *managedClusterEventHandler) RegisterHandler(conflationManager *conflator.ConflationManager) { |
| 39 | + conflationManager.Register(conflator.NewConflationRegistration( |
| 40 | + h.eventPriority, |
| 41 | + h.eventSyncMode, |
| 42 | + h.eventType, |
| 43 | + h.handleEvent, |
| 44 | + )) |
| 45 | +} |
| 46 | + |
| 47 | +func (h *managedClusterEventHandler) handleEvent(ctx context.Context, evt *cloudevents.Event) error { |
| 48 | + version := evt.Extensions()[eventversion.ExtVersion] |
| 49 | + leafHubName := evt.Source() |
| 50 | + h.log.V(2).Info(startMessage, "type", evt.Type(), "LH", evt.Source(), "version", version) |
| 51 | + |
| 52 | + managedClusterEvents := event.ManagedClusterEventBundle{} |
| 53 | + if err := evt.DataAs(&managedClusterEvents); err != nil { |
| 54 | + return err |
| 55 | + } |
| 56 | + |
| 57 | + for _, managedClusterEvent := range managedClusterEvents { |
| 58 | + managedClusterEvent.LeafHubName = leafHubName |
| 59 | + } |
| 60 | + |
| 61 | + if len(managedClusterEvents) <= 0 { |
| 62 | + h.log.Info("empty managed cluster event payload", "event", evt) |
| 63 | + return nil |
| 64 | + } |
| 65 | + |
| 66 | + db := database.GetGorm() |
| 67 | + err := db.Clauses(clause.OnConflict{ |
| 68 | + Columns: []clause.Column{{Name: "leaf_hub_name"}, {Name: "event_name"}, {Name: "created_at"}}, |
| 69 | + DoNothing: true, |
| 70 | + }).CreateInBatches(managedClusterEvents, 100).Error |
| 71 | + if err != nil { |
| 72 | + return fmt.Errorf("failed handling leaf hub LocalPolicyStatusEvent event - %w", err) |
| 73 | + } |
| 74 | + |
| 75 | + h.log.V(2).Info(finishMessage, "type", evt.Type(), "LH", evt.Source(), "version", version) |
| 76 | + return nil |
| 77 | +} |
0 commit comments