Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion apps/baseline/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from django.contrib.gis.db import models
from django.core.exceptions import ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db.models import F
from django.db.models import F, Q
from django.utils.translation import gettext_lazy as _
from model_utils.managers import InheritanceManager

Expand All @@ -18,6 +18,7 @@
ClassifiedProduct,
Country,
Currency,
SearchQueryMixin,
UnitOfMeasure,
UnitOfMeasureConversion,
)
Expand Down Expand Up @@ -62,6 +63,28 @@ class ExtraMeta:
identifier = ["name"]


class LivelihoodZoneQuerySet(SearchQueryMixin, models.QuerySet):
"""
Searchable LivelihoodZones
"""

def get_search_filter(self, search_term):
return (
Q(code__iexact=search_term)
| Q(alternate_code__iexact=search_term)
| Q(name_en__icontains=search_term)
| Q(name_pt__icontains=search_term)
| Q(name_es__icontains=search_term)
| Q(name_fr__icontains=search_term)
| Q(name_ar__icontains=search_term)
| Q(description_en__icontains=search_term)
| Q(description_pt__icontains=search_term)
| Q(description_es__icontains=search_term)
| Q(description_fr__icontains=search_term)
| Q(description_ar__icontains=search_term)
)


class LivelihoodZone(common_models.Model):
"""
A geographical area within a Country in which people share broadly the same
Expand Down Expand Up @@ -98,6 +121,7 @@ class LivelihoodZone(common_models.Model):
name = TranslatedField(common_models.NameField(max_length=200, unique=True))
description = TranslatedField(common_models.DescriptionField())
country = models.ForeignKey(Country, verbose_name=_("Country"), db_column="country_code", on_delete=models.PROTECT)
objects = common_models.IdentifierManager.from_queryset(LivelihoodZoneQuerySet)()

class Meta:
verbose_name = _("Livelihood Zone")
Expand Down
134 changes: 134 additions & 0 deletions apps/baseline/tests/test_viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
from baseline.models import LivelihoodZoneBaseline
from common.fields import translation_fields
from common.tests.factories import ClassifiedProductFactory, CountryFactory
from metadata.tests.factories import (
LivelihoodCategoryFactory,
WealthCharacteristicFactory,
)

from .factories import (
BaselineLivelihoodActivityFactory,
Expand Down Expand Up @@ -536,6 +540,136 @@ def test_population_estimate_range_filter(self):
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 0)

def test_filter_by_product(self):
product = ClassifiedProductFactory(
cpc="K0111",
description_en="my product",
common_name_en="common",
kcals_per_unit=550,
aliases=["test alias"],
)
ClassifiedProductFactory(cpc="K01111")
baseline = LivelihoodZoneBaselineFactory()
LivelihoodStrategyFactory(product=product, livelihood_zone_baseline=baseline)
response = self.client.get(self.url, {"product": "K011"})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(json.loads(response.content)), 1)
# filter by cpc
response = self.client.get(self.url, {"product": "K0111"})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(json.loads(response.content)), 1)
# filter by cpc startswith
response = self.client.get(self.url, {"product": "K01111"})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(json.loads(response.content)), 0)
# filter by description icontains
response = self.client.get(self.url, {"product": "my"})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(json.loads(response.content.decode("utf-8"))), 1)
# filter by description
response = self.client.get(self.url, {"product": "my product"})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(json.loads(response.content.decode("utf-8"))), 1)
# filter by alias
response = self.client.get(self.url, {"product": "test"})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(json.loads(response.content.decode("utf-8"))), 1)

def test_filter_by_wealth_characteristic(self):
baseline = LivelihoodZoneBaselineFactory()
wealth_characteristic = WealthCharacteristicFactory()
WealthGroupCharacteristicValueFactory(
wealth_group__livelihood_zone_baseline=baseline, wealth_characteristic=wealth_characteristic
)
response = self.client.get(self.url, {"wealth_characteristic": wealth_characteristic.code})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(json.loads(response.content.decode("utf-8"))), 1)
response = self.client.get(self.url, {"wealth_characteristic": wealth_characteristic.name})
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.json()), 1)


class LivelihoodBaselineFacetedSearchViewTestCase(APITestCase):
def setUp(self):
self.category1 = LivelihoodCategoryFactory()
self.baseline1 = LivelihoodZoneBaselineFactory(main_livelihood_category=self.category1)
self.baseline2 = LivelihoodZoneBaselineFactory(main_livelihood_category=self.category1)
self.baseline3 = LivelihoodZoneBaselineFactory()
self.product1 = ClassifiedProductFactory(
cpc="K0111",
description_en="my test",
common_name_en="common",
kcals_per_unit=550,
aliases=["test alias"],
)
self.product2 = ClassifiedProductFactory(
cpc="L0111",
description_en="my mukera",
common_name_en="common mukera",
kcals_per_unit=550,
)
LivelihoodStrategyFactory(product=self.product1, livelihood_zone_baseline=self.baseline1)
self.characteristic1 = WealthCharacteristicFactory(description_en="my test")
self.characteristic2 = WealthCharacteristicFactory(description_en="my mukera", description_fr="my test")
WealthGroupCharacteristicValueFactory(
wealth_group__livelihood_zone_baseline=self.baseline1, wealth_characteristic=self.characteristic1
)
WealthGroupCharacteristicValueFactory(
wealth_group__livelihood_zone_baseline=self.baseline2, wealth_characteristic=self.characteristic2
)
self.characteristic3 = WealthCharacteristicFactory()
self.strategy = LivelihoodStrategyFactory(product=self.product1)
self.baseline = LivelihoodZoneBaselineFactory(main_livelihood_category=self.category1)
self.url = reverse("livelihood-baseline-faceted-search")

def test_search_with_product(self):
# Test when search matches entries
response = self.client.get(self.url, {"search": self.product1.description_en, "language": "en"})
self.assertEqual(response.status_code, 200)
data = response.data
self.assertEqual(len(data["products"]), 1)
self.assertEqual(data["products"][0]["count"], 2) # 2 zones have this proudct
# Search by the second product
response = self.client.get(
self.url,
{
"search": self.product2.description_en,
},
)
self.assertEqual(response.status_code, 200)
data = response.data
self.assertEqual(len(data["products"]), 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend these unit tests so that they finish by using the filter and checking that the number of results matches the facet count?

I.e. call /api/livelihoodzonebaseline.json?product=XXX and check len(response.json)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the values from data["products"][0]["filter"], etc. to create the url.


def test_search_with_wealth_characterstics(self):
# Test when search matches entries
response = self.client.get(self.url, {"search": self.characteristic1.description_en})
self.assertEqual(response.status_code, 200)
data = response.data
self.assertEqual(len(data["items"]), 2)
self.assertEqual(data["items"][0]["count"], 1) # 1 zone for this characteristic
self.assertEqual(data["items"][1]["count"], 1) # 1 zone for this characteristic
# Search by the second characteristic
response = self.client.get(
self.url,
{
"search": self.characteristic2.description_en,
},
)
self.assertEqual(response.status_code, 200)
data = response.data
self.assertEqual(len(data["items"]), 1)
self.assertEqual(data["items"][0]["count"], 1) # 1 zone for this characteristic
# Search by the third characteristic
response = self.client.get(
self.url,
{
"search": self.characteristic3.description_en,
},
)
data = response.data
self.assertEqual(response.status_code, 200)
self.assertEqual(len(data["items"]), 0)


class LivelihoodProductCategoryViewSetTestCase(APITestCase):
@classmethod
Expand Down
163 changes: 163 additions & 0 deletions apps/baseline/viewsets.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from django.apps import apps
from django.db import models
from django.db.models import F, OuterRef, Q, Subquery
from django.db.models.functions import Coalesce, NullIf
from django.utils.translation import override
from django_filters import rest_framework as filters
from django_filters.filters import CharFilter
from rest_framework.permissions import AllowAny
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework.viewsets import ModelViewSet

from common.fields import translation_fields
Expand Down Expand Up @@ -188,6 +195,50 @@ class Meta:
label="Country",
)
population_estimate = filters.RangeFilter(label="Population estimate range")
product = CharFilter(method="filter_by_product", label="Filter by Product")
wealth_characteristic = CharFilter(
method="filter_by_wealth_characteristic", label="Filter by Wealth Characteristic"
)

def filter_by_product(self, queryset, name, value):
"""
Filter the baseline by matching products
"""
field_lookups = [
*[(field, "icontains") for field in translation_fields("product__common_name")],
("product__cpc", "istartswith"),
*[(field, "icontains") for field in translation_fields("product__description")],
("product__aliases", "icontains"),
]

q_object = Q()
for field, lookup in field_lookups:
q_object |= Q(**{f"{field}__{lookup}": value})

matching_baselines = LivelihoodStrategy.objects.filter(q_object).values("livelihood_zone_baseline")

return queryset.filter(id__in=Subquery(matching_baselines))

def filter_by_wealth_characteristic(self, queryset, name, value):
"""
Filter the baseline by matching wealth_characteristic
"""
field_lookups = [
*[(field, "icontains") for field in translation_fields("wealth_characteristic__name")],
("wealth_characteristic__code", "iexact"),
*[(field, "icontains") for field in translation_fields("wealth_characteristic__description")],
("wealth_characteristic__aliases", "icontains"),
]

q_object = Q()
for field, lookup in field_lookups:
q_object |= Q(**{f"{field}__{lookup}": value})

matching_baselines = WealthGroupCharacteristicValue.objects.filter(q_object).values(
"wealth_group__livelihood_zone_baseline"
)

return queryset.filter(id__in=Subquery(matching_baselines))


class LivelihoodZoneBaselineViewSet(BaseModelViewSet):
Expand Down Expand Up @@ -1778,3 +1829,115 @@ def get_calculations_on_aggregates(self):
slice_percent_field_name = self.serializer_class.slice_percent_field_name(field_name, aggregate)
calcs_on_aggregates[slice_percent_field_name] = expr
return calcs_on_aggregates


MODELS_TO_SEARCH = [
{
"app_name": "common",
"model_name": "ClassifiedProduct",
"filter": {"key": "product", "label": "Product", "category": "products"},
},
{
"app_name": "metadata",
"model_name": "LivelihoodCategory",
"filter": {"key": "main_livelihood_category", "label": "Main Livelihood Category", "category": "zone_types"},
},
{
"app_name": "baseline",
"model_name": "LivelihoodZone",
"filter": {"key": "livelihood_zone", "label": "Livelihood zone", "category": "zones"},
},
{
"app_name": "metadata",
"model_name": "WealthCharacteristic",
"filter": {"key": "wealth_characteristic", "label": "Items", "category": "items"},
},
{
"app_name": "common",
"model_name": "Country",
"filter": {"key": "country", "label": "Country", "category": "countries"},
},
]


class LivelihoodBaselineFacetedSearchView(APIView):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be LivelihoodZoneBaselineFacetedSearchView

"""
Performs a faceted search to find Livelihood Zone Baselines using a specified search term.

The search applies to multiple related models, filtering results based on the configured
criteria for each model. For each matching result, it calculates the number of unique
livelihood zones associated with the filter and includes relevant metadata in the response.
"""

renderer_classes = [JSONRenderer]
permission_classes = []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better approach is:

    permission_classes = [AllowAny]

and no get_permissions. Does that work?


def get_permissions(self):
return [AllowAny()]

def get(self, request, format=None):
"""
Return a faceted set of matching filters
"""
results = {}
search_term = request.query_params.get("search", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that search here should be read from settings.SEARCH_PARAM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

language = request.query_params.get("language", "en")

if search_term:
for model_entry in MODELS_TO_SEARCH:
app_name = model_entry["app_name"]
model_name = model_entry["model_name"]
filter, filter_label, filter_category = (
model_entry["filter"]["key"],
model_entry["filter"]["label"],
model_entry["filter"]["category"],
)
ModelClass = apps.get_model(app_name, model_name)
search_per_model = ModelClass.objects.search(search_term)
results[filter_category] = []
# for activating language
with override(language):
for search_result in search_per_model:
if model_name == "ClassifiedProduct":
unique_zones = (
LivelihoodStrategy.objects.filter(product=search_result)
.values("livelihood_zone_baseline")
.distinct()
.count()
)
value_label, value = search_result.description, search_result.pk
elif model_name == "LivelihoodCategory":
unique_zones = LivelihoodZoneBaseline.objects.filter(
main_livelihood_category=search_result
).count()
value_label, value = search_result.description, search_result.pk
elif model_name == "LivelihoodZone":
unique_zones = LivelihoodZoneBaseline.objects.filter(livelihood_zone=search_result).count()
value_label, value = search_result.name, search_result.pk
elif model_name == "WealthCharacteristic":
unique_zones = (
WealthGroupCharacteristicValue.objects.filter(wealth_characteristic=search_result)
.values("wealth_group__livelihood_zone_baseline")
.distinct()
.count()
)
value_label, value = search_result.description, search_result.pk
elif model_name == "Country":
unique_zones = (
LivelihoodZoneBaseline.objects.filter(livelihood_zone__country=search_result)
.distinct()
.count()
)
value_label, value = search_result.iso_en_name, search_result.pk
if unique_zones > 0:
results[filter_category].append(
{
"filter": filter,
"filter_label": filter_label,
"value_label": value_label,
"value": value,
"count": unique_zones,
}
)

return Response(results)
Loading
Loading