Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow nested accounts to be validated with accounting dimensions #41294

Closed
wants to merge 8 commits into from
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# Copyright, (c) 2020, Frappe Technologies Pvt. Ltd. and contributors
# For license information, please see license.txt

from typing import Any, Literal

import frappe
from frappe import _, scrub
from frappe.model.document import Document
from frappe.query_builder import DocType
from frappe.utils.nestedset import get_descendants_of


class AccountingDimensionFilter(Document):
Expand Down Expand Up @@ -40,20 +43,21 @@ def validate(self):
self.validate_applicable_accounts()

def validate_applicable_accounts(self):
accounts = frappe.db.sql(
"""
SELECT a.applicable_on_account as account
FROM `tabApplicable On Account` a, `tabAccounting Dimension Filter` d
WHERE d.name = a.parent
and d.name != %s
and d.accounting_dimension = %s
""",
(self.name, self.accounting_dimension),
as_dict=1,
ApplicableOnAccount = DocType("Applicable On Account")
AccountingDimensionFilter = DocType("Accounting Dimension Filter")

account_list = (
frappe.qb.from_(ApplicableOnAccount)
.inner_join(AccountingDimensionFilter)
.on(AccountingDimensionFilter.name == ApplicableOnAccount.parent)
.select(
(ApplicableOnAccount.applicable_on_account).as_("account"),
)
.where(AccountingDimensionFilter.name != self.name)
.where(AccountingDimensionFilter.accounting_dimension == self.accounting_dimension)
.run(as_list=True, pluck="account")
)

account_list = [d.account for d in accounts]

for account in self.get("accounts"):
if account.applicable_on_account in account_list:
frappe.throw(
Expand All @@ -65,46 +69,69 @@ def validate_applicable_accounts(self):
)


def get_dimension_filter_map():
if not frappe.flags.get("dimension_filter_map"):
filters = frappe.db.sql(
"""
SELECT
a.applicable_on_account, d.dimension_value, p.accounting_dimension,
p.allow_or_restrict, a.is_mandatory
FROM
`tabApplicable On Account` a,
`tabAccounting Dimension Filter` p
LEFT JOIN `tabAllowed Dimension` d ON d.parent = p.name
WHERE
p.name = a.parent
AND p.disabled = 0
""",
as_dict=1,
def get_dimension_filter_map() -> dict[tuple[str, str], dict[str, Any]]:
if frappe.flags.get("dimension_filter_map"):
return frappe.flags.dimension_filter_map

ApplicableOnAccount = DocType("Applicable On Account")
AllowedDimension = DocType("Allowed Dimension")
AccountingDimensionFilter = DocType("Accounting Dimension Filter")

filters = (
frappe.qb.from_(ApplicableOnAccount)
.inner_join(AccountingDimensionFilter)
.on(AccountingDimensionFilter.name == ApplicableOnAccount.parent)
.inner_join(AllowedDimension)
.on(AccountingDimensionFilter.name == AllowedDimension.parent)
.select(
ApplicableOnAccount.applicable_on_account,
ApplicableOnAccount.is_mandatory,
AllowedDimension.dimension_value,
AccountingDimensionFilter.accounting_dimension,
AccountingDimensionFilter.allow_or_restrict,
)
.run(as_dict=True)
)

dimension_filter_map = {}
dimension_filter_map = {}

for f in filters:
f.fieldname = scrub(f.accounting_dimension)
for f in filters:
f.fieldname = scrub(f.accounting_dimension)
if frappe.get_cached_value("Account", f.applicable_on_account, "is_group"):
accounts = get_descendants_of("Account", f.applicable_on_account)
else:
accounts = [f.applicable_on_account]

for account in accounts:
build_map(
dimension_filter_map,
f.fieldname,
f.applicable_on_account,
account,
f.dimension_value,
f.allow_or_restrict,
f.is_mandatory,
)
frappe.flags.dimension_filter_map = dimension_filter_map

frappe.flags.dimension_filter_map = dimension_filter_map
return frappe.flags.dimension_filter_map


def build_map(map_object, dimension, account, filter_value, allow_or_restrict, is_mandatory):
def build_map(
map_object: dict[tuple[str, str], dict[str, Any]],
dimension: str,
account: str,
filter_value: str,
allow_or_restrict: Literal["Allow", "Restrict"],
is_mandatory: bool,
):
map_object.setdefault(
(dimension, account),
{"allowed_dimensions": [], "is_mandatory": is_mandatory, "allow_or_restrict": allow_or_restrict},
{
"allowed_dimensions": [],
"is_mandatory": is_mandatory,
"allow_or_restrict": allow_or_restrict,
},
)

if filter_value:
map_object[(dimension, account)]["allowed_dimensions"].append(filter_value)
Loading