The concept of this blogpost is quite simple - we will start with an imaginary company that has identified some threats to their storage accounts and follow the process of detection engineering. The field of detection engineering is in itself quite big and complex, so I will resort to some gross oversimplification (sorry!) while also trying to cover all the bases of what I think is important.
You might have heard the term “Detection Engineering Lifecycle”, and if not, you’re in luck. This is a process (notice I don’t say “the process”) used to develop and improve detection. Now, why is there no “the process” but rather “a process”? Well, there’s just so many different takes on it. Generally they all follow a simple flow like this:
graph LR
A[Planning] --> B[Development]
B --> C[Delivery]
C -->D[Improvement]
I’d say that rather than follow a process defined by a big global enterprise with more bullet points than I could count in a lifetime, try to limit the process or workflow to something you can actually manage with your current team. My take is something pretty simple:
I think that’s as basic as we get - we can obviously also dive into the technical aspects of how to set up the process, but I’ll just repeat myself once over and say build something that fits you. If your analyst and detection engineers prefer to work in their tools and create stuff there, align your flows to that, if they prefer working in Git with pull requests, align your flows to that.
graph LR
A(Risk)
B(Threat)
A --> C[Prioritization]
B --> C[Prioritization]
C --> D[Analysis]
D --> E[Code development]
E --> AUT["Automation/Deployment"]
AUT --> V[Validate]
V --> SV[Validate in production]
SV --> ST[Statistics]
SV -.-> |Feedback loop|C
In our scenario we are a company using Azure for storing files in storage account blobs. I’ve configured four storage accounts with blobs for the purpose of this test:
Let’s get started with some information gathering.
We don’t have any protection tooling in place and detection gap analysis shows that we are lacking detection for the entire kill-chain. Thus it’s decided to focus on whatever is the most prevalent. I mostly use Feedly as a tool for looking up new information, and for the enterprise versions they have MITRE mapping and more. For now I’ve identified something I can start working with:

This takes me to the following Microsoft article which in turn sends me on to a Microsoft Defender for Threat Intelligence report on storage blob attacks. Here we find some statistics on the amount techniques per tactic:

Based on this information, we will start gathering information for building a query to help us detect enumeration of storage accounts. If we head back over to Feedly, we can also (if we have the right license) export a MITRE Attack navigator layer of the TTPs in the article. This is useful for looking into specific enumerations tactics and techniques.
Heading back over to the MDTI report, if we scroll to the bottom we can find the following sample query that covers our usage perfectly:

Now this we can work with. The query is as follows:
let maxTimeBetweenRequests = 30s;
let maxWindowTime = 12h;
let timeRange = 30d;
let authTypes = dynamic(["Anonymous"]);
StorageBlobLogs
| where TimeGenerated > ago(timeRange)
// Collect anonymous requests to storage
| where AuthenticationType has_any(authTypes)
| where Uri !endswith "favicon.ico"
| where Category =~ "StorageRead"
// Process the filepath out of the request URI
| extend FilePath = array_slice(split(split(Uri, "?")[0], "/"), 3, -1)
| extend FullPath = strcat("/", strcat_array(FilePath, "/"))
| project
TimeGenerated,
AccountName,
FullPath,
CallerIpAddress,
UserAgentHeader,
StatusCode
| order by TimeGenerated asc
| serialize
// Generate sessions of access activity, where each request is within maxTimeBetweenRequests doens't last longer than maxWindowTime
| extend SessionStarted = row_window_session(TimeGenerated, maxWindowTime, maxTimeBetweenRequests, AccountName != prev(AccountName))
| order by TimeGenerated asc
// Summarize the results using the Session start time
| summarize Paths=make_list(FullPath), Statuses=make_set(StatusCode), CallerIPs=make_list(CallerIpAddress),
DistinctPathCount=dcount(FullPath), AllRequestsCount=count(), CallerIPCount=dcount(CallerIpAddress), CallerUACount=dcount(UserAgentHeader), SessionEnded=max(TimeGenerated)
by SessionStarted, AccountName
// Validate that each path visited is unique, scanners will generally try files once
| where DistinctPathCount > 1 and DistinctPathCount == AllRequestsCount
| order by DistinctPathCount
| extend ["Duration (Mins)"] = datetime_diff("minute", SessionEnded, SessionStarted)
| project
SessionStarted,
SessionEnded,
['Duration (Mins)'],
AccountName,
DistinctPathCount,
AllRequestsCount
With the query ready, let’s do some validation.
I run a tool call cloud_enum (find it here) with the following command to verify (-k allows me to define words to look for, can also use a wordlist with other options):
./cloud_enum -k demo1 -k demo2 -k demo3 -k demo4 --disable-aws --disable-gcp
As you can see, I also disable checks for AWS and GCP since I just wanted to validate for Azure. It looks something like this on my parrot VM:

However, when testing the query in Advanced Hunting in Defender XDR I only get results for the demo1, demo2 and demo4 accounts - not for the vulnerable demo3 account. Why is this?

Well, there’s a line in the query that compares the amount of requests with the distinct paths visited, which makes a lot of sense since a scanner would only visit each path once. It’s this line:
// Validate that each path visited is unique, scanners will generally try files once
| where DistinctPathCount > 1 and DistinctPathCount == AllRequestsCount
But what happens when a legitimate user is visiting the vulnerable blob while the scan is running? Well, the distinct path count is now lower than the amount of requests because we’ve visited some paths multiple times. If we remove the bottom part of the query we’ll actually get a bit closer:
let maxTimeBetweenRequests = 30s;
let maxWindowTime = 12h;
let timeRange = 30d;
let authTypes = dynamic(["Anonymous"]);
StorageBlobLogs
| where TimeGenerated > ago(timeRange)
// Collect anonymous requests to storage
| where AuthenticationType has_any(authTypes)
| where Uri !endswith "favicon.ico"
| where OperationName == "ListBlobs"
// Process the filepath out of the request URI
| extend FilePath = array_slice(split(split(Uri, "?")[0], "/"), 3, -1)
| extend FullPath = strcat("/", strcat_array(FilePath, "/"))
| project
TimeGenerated,
AccountName,
FullPath,
UserAgentHeader,
StatusCode
| order by TimeGenerated asc
| serialize
// Generate sessions of access activity, where each request is within maxTimeBetweenRequests doens't last longer than maxWindowTime
| extend SessionStarted = row_window_session(TimeGenerated, maxWindowTime, maxTimeBetweenRequests, AccountName != prev(AccountName))
| order by TimeGenerated asc
| summarize Paths=make_list(FullPath), Statuses=make_set(StatusCode),
DistinctPathCount=dcount(FullPath), AllRequestsCount=count(), CallerUACount=dcount(UserAgentHeader), SessionEnded=max(TimeGenerated)
by SessionStarted, AccountName
| project
SessionStarted,
SessionEnded,
['Duration (Mins)'],
AccountName,
DistinctPathCount,
AllRequestsCount
I removed the portion where we are calculating and filtering on the DistinctPathCount.

What’s the lesson here? Validating during query creation is well and good, but we need to test detection in prod to see if it actually works. Simulating “live” when people are working as normally is important for detection purposes. Deploying the query as it was might give you results, but it might also miss some crucial enumeration attempts.
At this point, we can deploy the query to prod and monitor it. We should validate it using statistics (true, false, benign positive rates) on regular intervals, and also have a feedback loop from whoever is handling our alerts in order to effectively deliver feedback (this rule isn’t very actionable, not sure what to do with this) to our detection engineering team.
Hope this article helps a bit!