Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kuma-cp): deduplicate dataplane inbounds by address and port combination #12760

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions pkg/plugins/runtime/k8s/controllers/inbound_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,29 @@ func inboundForServiceless(zone string, pod *kube_core.Pod, name string, nodeLab
}
}

// Deprecated: LegacyInboundInterfacesFor is currently only used for delegated gateway and Mesh without MeshService exclusive
// to not change order of inbounds.
// For gateway we pick first inbound to take tags from. Delegated gateway identity relies on this.
// For Dataplanes when MeshService is disabled we base identity and routing
// TODO: We should revisit this when we rework identity. More in https://github.com/kumahq/kuma/issues/3339
func (i *InboundConverter) LegacyInboundInterfacesFor(ctx context.Context, zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
return i.inboundInterfacesFor(ctx, zone, pod, services)
}

// InboundInterfacesFor should be used when MeshService mode is Exclusive
func (i *InboundConverter) InboundInterfacesFor(ctx context.Context, zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
inbounds, err := i.inboundInterfacesFor(ctx, zone, pod, services)
if err != nil {
return inbounds, err
}

// Right now we will build multiple inbounds for each service selecting port, but later on
// we will only create one listener for last inbound. Ignoring other inbounds from dataplane.
// Because of this we can safely deduplicate them here. This needs to change when we get rid of kuma.io/service
return deduplicateInboundsByAddressAndPort(inbounds), nil
}

func (i *InboundConverter) inboundInterfacesFor(ctx context.Context, zone string, pod *kube_core.Pod, services []*kube_core.Service) ([]*mesh_proto.Dataplane_Networking_Inbound, error) {
nodeLabels, err := i.getNodeLabelsToCopy(ctx, pod.Spec.NodeName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -161,6 +183,23 @@ func (i *InboundConverter) InboundInterfacesFor(ctx context.Context, zone string
return ifaces, nil
}

func deduplicateInboundsByAddressAndPort(ifaces []*mesh_proto.Dataplane_Networking_Inbound) []*mesh_proto.Dataplane_Networking_Inbound {
inboundKey := func(iface *mesh_proto.Dataplane_Networking_Inbound) string {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one of my testing, I found when the service port is different than the targetPort, the two inbounds generated on the DP has the same name and port, but they have different k8s.kuma.io/service-port tags: one of them is incorrect. So in this case, we need to remove the one with incorrect tag. Could you also verify this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean the situation when we have service with two ports?
With this approach we don't care about tags, inbound tags will be gone after we fully switch to MeshService exclusive. I think in this situation we will just create inbound from pod port

return fmt.Sprintf("%s:%d", iface.Address, iface.Port)
}

var deduplicatedInbounds []*mesh_proto.Dataplane_Networking_Inbound
inboundsPerAddressPort := map[string]*mesh_proto.Dataplane_Networking_Inbound{}
for _, iface := range ifaces {
if inboundsPerAddressPort[inboundKey(iface)] == nil {
inboundsPerAddressPort[inboundKey(iface)] = iface
deduplicatedInbounds = append(deduplicatedInbounds, iface)
}
}

return deduplicatedInbounds
}

func (i *InboundConverter) getNodeLabelsToCopy(ctx context.Context, nodeName string) (map[string]string, error) {
if len(i.NodeLabelsToCopy) == 0 || nodeName == "" {
return map[string]string{}, nil
Expand Down
13 changes: 12 additions & 1 deletion pkg/plugins/runtime/k8s/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/core/resources/model"
k8s_common "github.com/kumahq/kuma/pkg/plugins/common/k8s"
"github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1"
mesh_k8s "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/api/v1alpha1"
k8s_model "github.com/kumahq/kuma/pkg/plugins/resources/k8s/native/pkg/model"
"github.com/kumahq/kuma/pkg/plugins/runtime/k8s/metadata"
Expand Down Expand Up @@ -317,14 +318,24 @@ func (r *PodReconciler) createOrUpdateDataplane(
services []*kube_core.Service,
others []*mesh_k8s.Dataplane,
) error {
meshName := util_k8s.MeshOfByLabelOrAnnotation(r.Log, pod, ns)
k8sMesh := v1alpha1.Mesh{}
if err := r.Client.Get(ctx, kube_types.NamespacedName{Name: meshName}, &k8sMesh); err != nil {
return err
}
mesh := core_mesh.NewMeshResource()
if err := r.ResourceConverter.ToCoreResource(&k8sMesh, mesh); err != nil {
return err
}

dataplane := &mesh_k8s.Dataplane{
ObjectMeta: kube_meta.ObjectMeta{
Namespace: pod.Namespace,
Name: pod.Name,
},
}
operationResult, err := kube_controllerutil.CreateOrUpdate(ctx, r.Client, dataplane, func() error {
if err := r.PodConverter.PodToDataplane(ctx, dataplane, pod, ns, services, others); err != nil {
if err := r.PodConverter.PodToDataplane(ctx, dataplane, pod, ns, services, others, mesh.Spec.MeshServicesMode()); err != nil {
return errors.Wrap(err, "unable to translate a Pod into a Dataplane")
}
if err := kube_controllerutil.SetControllerReference(pod, dataplane, r.Scheme); err != nil {
Expand Down
Loading
Loading